Skip to content

Commit

Permalink
feat: PRT provider load rate (#1720)
Browse files Browse the repository at this point in the history
* load rate report in trailer

* fix trailer name

* fix lint

* fix load manager logic

* fix lint

* fix spelling

* fix logic

* fixed flag & header names

* fix load provider manager and creation logic

* fix logs for relay load rate

* fix rpcprovider server relay load handling

* fix tests

* fix typo

* fix init lava script

* fix provider load manager

* fix provider server and load manager

* fix lint - fix protocol test

* fix provider load manager applyProviderLoadMetadataToContextTrailer

* change cmdRPCProvider load rate flag to uint64

* try fix

* fix cmd flag reading

* adjusting uint64

* fix redundent nil check in provider load manager

* fix providerLoadManager per chain creation

* rename and fix instance passing unnecessarily

* fixed chainlib common formatting

* fix provider load manager comments

* fix e2e tests

* fix pr - unite add relay load and set trailer

* fix common.go provider load header

* fix edge case of getProviderLoad

* fix command flag description

* fix command flag description

* add metric for load rate

* fix division to be float and not uint

* roll back init lava only with node two consumers

* fix load metric

* merge main

* Update protocol/chainlib/common.go

Co-authored-by: Elad Gildnur <[email protected]>

* fix load calculation

* tidy code

* changing rate limit to 1k

* fix bug

* fix pr

* v4

* fix pr

* fix

---------

Co-authored-by: leon mandel <[email protected]>
Co-authored-by: Ran Mishael <[email protected]>
Co-authored-by: Leon Magma <[email protected]>
Co-authored-by: Elad Gildnur <[email protected]>
Co-authored-by: Omer <[email protected]>
  • Loading branch information
6 people authored Oct 30, 2024
1 parent 479a906 commit d677c37
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 45 deletions.
2 changes: 2 additions & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ const (
relayMsgLogMaxChars = 200
RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash"
RPCProviderNodeExtension = "Lava-Provider-Node-Extension"
RpcProviderLoadRateHeader = "Lava-Provider-Load-Rate"
RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id"
WebSocketExtension = "websocket"
)

var (
TrailersToAddToHeaderResponse = []string{RPCProviderNodeExtension, RpcProviderLoadRateHeader}
InvalidResponses = []string{"null", "", "nil", "undefined"}
FailedSendingSubscriptionToClients = sdkerrors.New("failed Sending Subscription To Clients", 1015, "Failed Sending Subscription To Clients connection might have been closed by the user")
NoActiveSubscriptionFound = sdkerrors.New("failed finding an active subscription on provider side", 1016, "no active subscriptions for hashed params.")
Expand Down
1 change: 1 addition & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
// websocket flags
RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection"
BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded"
RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc
chainTracker.StartAndServe(ctx)
reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser)
mockReliabilityManager := NewMockReliabilityManager(reliabilityManager)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil)
listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health")
err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint)
require.NoError(t, err)
Expand Down
10 changes: 10 additions & 0 deletions protocol/metrics/provider_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ProviderMetrics struct {
totalRelaysServicedMetric *prometheus.CounterVec
totalErroredMetric *prometheus.CounterVec
consumerQoSMetric *prometheus.GaugeVec
loadRateMetric *prometheus.GaugeVec
}

func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pairingtypes.QualityOfServiceReport) {
Expand Down Expand Up @@ -49,6 +50,13 @@ func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pair
}
}

func (pm *ProviderMetrics) SetLoadRate(loadRate float64) {
if pm == nil {
return
}
pm.loadRateMetric.WithLabelValues(pm.specID).Set(loadRate)
}

func (pm *ProviderMetrics) AddPayment(cu uint64) {
if pm == nil {
return
Expand All @@ -72,6 +80,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom
totalRelaysServicedMetric *prometheus.CounterVec,
totalErroredMetric *prometheus.CounterVec,
consumerQoSMetric *prometheus.GaugeVec,
loadRateMetric *prometheus.GaugeVec,
) *ProviderMetrics {
pm := &ProviderMetrics{
specID: specID,
Expand All @@ -82,6 +91,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom
totalRelaysServicedMetric: totalRelaysServicedMetric,
totalErroredMetric: totalErroredMetric,
consumerQoSMetric: consumerQoSMetric,
loadRateMetric: loadRateMetric,
}
return pm
}
10 changes: 9 additions & 1 deletion protocol/metrics/provider_metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ProviderMetricsManager struct {
endpointsHealthChecksOk uint64
relaysMonitors map[string]*RelaysMonitor
relaysMonitorsLock sync.RWMutex
loadRateMetric *prometheus.GaugeVec
}

func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Expand Down Expand Up @@ -107,6 +108,11 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Help: "The total number of get latest block queries that succeeded by chainfetcher",
}, []string{"spec"})

loadRateMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_load_rate",
Help: "The load rate according to the load rate limit - Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.",
}, []string{"spec"})

fetchBlockSuccessMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_provider_fetch_block_success",
Help: "The total number of get specific block queries that succeeded by chainfetcher",
Expand Down Expand Up @@ -141,6 +147,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
prometheus.MustRegister(virtualEpochMetric)
prometheus.MustRegister(endpointsHealthChecksOkMetric)
prometheus.MustRegister(protocolVersionMetric)
prometheus.MustRegister(loadRateMetric)

providerMetricsManager := &ProviderMetricsManager{
providerMetrics: map[string]*ProviderMetrics{},
Expand All @@ -161,6 +168,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
relaysMonitors: map[string]*RelaysMonitor{},
loadRateMetric: loadRateMetric,
}

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -209,7 +217,7 @@ func (pme *ProviderMetricsManager) AddProviderMetrics(specID, apiInterface strin
}

if pme.getProviderMetric(specID, apiInterface) == nil {
providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric)
providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric, pme.loadRateMetric)
pme.setProviderMetric(providerMetric)

endpoint := fmt.Sprintf("/metrics/%s/%s/health", specID, apiInterface)
Expand Down
23 changes: 15 additions & 8 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,20 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch
chainMessage.SetForceCacheRefresh(ok)
}

// Iterating over metadataHeaders adding each trailer that fits the header if found to relayResult.Relay.Metadata
func (rpccs *RPCConsumerServer) getMetadataFromRelayTrailer(metadataHeaders []string, relayResult *common.RelayResult) {
for _, metadataHeader := range metadataHeaders {
trailerValue := relayResult.ProviderTrailer.Get(metadataHeader)
if len(trailerValue) > 0 {
extensionMD := pairingtypes.Metadata{
Name: metadataHeader,
Value: trailerValue[0],
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}
}
}

func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, protocolMessage chainlib.ProtocolMessage, apiName string) {
if relayResult == nil {
return
Expand Down Expand Up @@ -1336,14 +1350,7 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
}

// fetch trailer information from the provider by using the provider trailer field.
providerNodeExtensions := relayResult.ProviderTrailer.Get(chainlib.RPCProviderNodeExtension)
if len(providerNodeExtensions) > 0 {
extensionMD := pairingtypes.Metadata{
Name: chainlib.RPCProviderNodeExtension,
Value: providerNodeExtensions[0],
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}
rpccs.getMetadataFromRelayTrailer(chainlib.TrailersToAddToHeaderResponse, relayResult)

directiveHeaders := protocolMessage.GetDirectiveHeaders()
_, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY]
Expand Down
55 changes: 55 additions & 0 deletions protocol/rpcprovider/provider_load_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package rpcprovider

import (
"context"
"strconv"
"sync/atomic"

"github.com/lavanet/lava/v4/protocol/chainlib"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type ProviderLoadManager struct {
rateLimitThreshold uint64
activeRequestsPerSecond atomic.Uint64
}

func NewProviderLoadManager(rateLimitThreshold uint64) *ProviderLoadManager {
if rateLimitThreshold == 0 {
return nil
}
loadManager := &ProviderLoadManager{rateLimitThreshold: rateLimitThreshold}
return loadManager
}

func (loadManager *ProviderLoadManager) subtractRelayCall() {
if loadManager == nil {
return
}
loadManager.activeRequestsPerSecond.Add(^uint64(0))
}

func (loadManager *ProviderLoadManager) getProviderLoad(activeRequests uint64) float64 {
rateLimitThreshold := loadManager.rateLimitThreshold
if rateLimitThreshold == 0 {
return 0
}
return float64(activeRequests) / float64(rateLimitThreshold)
}

// Add relay count, calculate current load
func (loadManager *ProviderLoadManager) addAndSetRelayLoadToContextTrailer(ctx context.Context) float64 {
if loadManager == nil {
return 0
}
activeRequestsPerSecond := loadManager.activeRequestsPerSecond.Add(1)
provideRelayLoad := loadManager.getProviderLoad(activeRequestsPerSecond)
if provideRelayLoad == 0 {
return provideRelayLoad
}
formattedProviderLoad := strconv.FormatFloat(provideRelayLoad, 'f', -1, 64)
trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, formattedProviderLoad)
grpc.SetTrailer(ctx, trailerMd)
return provideRelayLoad
}
59 changes: 34 additions & 25 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type rpcProviderStartOptions struct {
healthCheckMetricsOptions *rpcProviderHealthCheckMetricsOptions
staticProvider bool
staticSpecPath string
relayLoadLimit uint64
}

type rpcProviderHealthCheckMetricsOptions struct {
Expand All @@ -123,24 +124,26 @@ type RPCProvider struct {
rpcProviderListeners map[string]*ProviderListener
lock sync.Mutex
// all of the following members need to be concurrency proof
providerMetricsManager *metrics.ProviderMetricsManager
rewardServer *rewardserver.RewardServer
privKey *btcec.PrivateKey
lavaChainID string
addr sdk.AccAddress
blockMemorySize uint64
chainMutexes map[string]*sync.Mutex
parallelConnections uint
cache *performance.Cache
shardID uint // shardID is a flag that allows setting up multiple provider databases of the same chain
chainTrackers *common.SafeSyncMap[string, *chaintracker.ChainTracker]
relaysMonitorAggregator *metrics.RelaysMonitorAggregator
relaysHealthCheckEnabled bool
relaysHealthCheckInterval time.Duration
grpcHealthCheckEndpoint string
providerUniqueId string
staticProvider bool
staticSpecPath string
providerMetricsManager *metrics.ProviderMetricsManager
rewardServer *rewardserver.RewardServer
privKey *btcec.PrivateKey
lavaChainID string
addr sdk.AccAddress
blockMemorySize uint64
chainMutexes map[string]*sync.Mutex
parallelConnections uint
cache *performance.Cache
shardID uint // shardID is a flag that allows setting up multiple provider databases of the same chain
chainTrackers *common.SafeSyncMap[string, *chaintracker.ChainTracker]
relaysMonitorAggregator *metrics.RelaysMonitorAggregator
relaysHealthCheckEnabled bool
relaysHealthCheckInterval time.Duration
grpcHealthCheckEndpoint string
providerUniqueId string
staticProvider bool
staticSpecPath string
relayLoadLimit uint64
providerLoadManagersPerChain *common.SafeSyncMap[string, *ProviderLoadManager]
}

func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
Expand All @@ -165,7 +168,8 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
rpcp.grpcHealthCheckEndpoint = options.healthCheckMetricsOptions.grpcHealthCheckEndpoint
rpcp.staticProvider = options.staticProvider
rpcp.staticSpecPath = options.staticSpecPath

rpcp.relayLoadLimit = options.relayLoadLimit
rpcp.providerLoadManagersPerChain = &common.SafeSyncMap[string, *ProviderLoadManager]{}
// single state tracker
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx)
providerStateTracker, err := statetracker.NewProviderStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, rpcp.providerMetricsManager)
Expand Down Expand Up @@ -307,9 +311,7 @@ func (rpcp *RPCProvider) SetupProviderEndpoints(rpcProviderEndpoints []*lavasess
wg.Add(parallelJobs)
disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs)
// validate static spec configuration is used only on a single chain setup.
chainIds := make(map[string]struct{})
for _, rpcProviderEndpoint := range rpcProviderEndpoints {
chainIds[rpcProviderEndpoint.ChainID] = struct{}{}
setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) {
defer wg.Done()
err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator)
Expand Down Expand Up @@ -404,8 +406,8 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
utils.Attribute{Key: "Chain", Value: rpcProviderEndpoint.ChainID},
utils.Attribute{Key: "apiInterface", Value: apiInterface})
}

// in order to utilize shared resources between chains we need go routines with the same chain to wait for one another here
var loadManager *ProviderLoadManager
chainCommonSetup := func() error {
rpcp.chainMutexes[chainID].Lock()
defer rpcp.chainMutexes[chainID].Unlock()
Expand Down Expand Up @@ -450,6 +452,12 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
chainTracker = chainTrackerLoaded
utils.LavaFormatDebug("reusing chain tracker", utils.Attribute{Key: "chain", Value: rpcProviderEndpoint.ChainID})
}

// create provider load manager per chain ID
loadManager, _, err = rpcp.providerLoadManagersPerChain.LoadOrStore(rpcProviderEndpoint.ChainID, NewProviderLoadManager(rpcp.relayLoadLimit))
if err != nil {
utils.LavaFormatError("Failed LoadOrStore providerLoadManagersPerChain", err, utils.LogAttr("chainId", rpcProviderEndpoint.ChainID), utils.LogAttr("rpcp.relayLoadLimit", rpcp.relayLoadLimit))
}
return nil
}
err = chainCommonSetup()
Expand Down Expand Up @@ -485,8 +493,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint))
providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey)
}

rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager)
// set up grpc listener
var listener *ProviderListener
func() {
Expand Down Expand Up @@ -717,6 +724,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
if stickinessHeaderName != "" {
RPCProviderStickinessHeaderName = stickinessHeaderName
}
relayLoadLimit := viper.GetUint64(common.RateLimitRequestPerSecondFlag)
prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName)
rewardStoragePath := viper.GetString(rewardserver.RewardServerStorageFlagName)
rewardTTL := viper.GetDuration(rewardserver.RewardTTLFlagName)
Expand Down Expand Up @@ -754,6 +762,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
&rpcProviderHealthCheckMetricsOptions,
staticProvider,
offlineSpecPath,
relayLoadLimit,
}

rpcProvider := RPCProvider{}
Expand Down Expand Up @@ -790,7 +799,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec")
cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json")

cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.")
common.AddRollingLogConfig(cmdRPCProvider)
return cmdRPCProvider
}
15 changes: 14 additions & 1 deletion protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type RPCProviderServer struct {
providerUniqueId string
StaticProvider bool
providerStateMachine *ProviderStateMachine
providerLoadManager *ProviderLoadManager
}

type ReliabilityManagerInf interface {
Expand Down Expand Up @@ -112,6 +113,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
relaysMonitor *metrics.RelaysMonitor,
providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager,
staticProvider bool,
providerLoadManager *ProviderLoadManager,
) {
rpcps.cache = cache
rpcps.chainRouter = chainRouter
Expand All @@ -134,6 +136,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
rpcps.relaysMonitor = relaysMonitor
rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager
rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter)
rpcps.providerLoadManager = providerLoadManager

rpcps.initRelaysMonitor(ctx)
}
Expand Down Expand Up @@ -180,7 +183,17 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain

// function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver
func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) {
grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId))
// get the number of simultaneous relay calls
currentLoad := rpcps.providerLoadManager.addAndSetRelayLoadToContextTrailer(ctx)
defer func() {
// add load metric and subtract the load at the end of the relay using a routine.
go func() {
rpcps.providerLoadManager.subtractRelayCall()
rpcps.metrics.SetLoadRate(currentLoad)
}()
}()
trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId)
grpc.SetTrailer(ctx, trailerMd)
if request.RelayData == nil || request.RelaySession == nil {
return nil, utils.LavaFormatWarning("invalid relay request, internal fields are nil", nil)
}
Expand Down
Loading

0 comments on commit d677c37

Please sign in to comment.