Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT provider load rate #1720

Merged
merged 58 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
7b8ea69
load rate report in trailer
Sep 30, 2024
4f7838a
fix trailer name
Sep 30, 2024
4fead99
merge main
Sep 30, 2024
ea1598b
fix lint
Sep 30, 2024
04f1762
fix load manager logic
Sep 30, 2024
eb2b345
fix lint
Sep 30, 2024
2c33935
fix spelling
Sep 30, 2024
46e6faf
fix logic
Sep 30, 2024
1f977ae
fixed flag & header names
Oct 1, 2024
6db7dd3
fix load provider manager and creation logic
Oct 1, 2024
b9e199b
fix logs for relay load rate
Oct 1, 2024
8b8a05a
fix rpcprovider server relay load handling
Oct 1, 2024
5e3b7a4
fix tests
Oct 1, 2024
199ff2c
fix typo
Oct 1, 2024
9faf8ef
fix init lava script
Oct 1, 2024
cc1af1e
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 1, 2024
56191f6
fix provider load manager
Oct 1, 2024
9eabb5a
fix provider server and load manager
Oct 1, 2024
ffdb986
fix lint - fix protocol test
Oct 1, 2024
19cc454
fix provider load manager applyProviderLoadMetadataToContextTrailer
Oct 1, 2024
b73d267
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 1, 2024
5a29efd
change cmdRPCProvider load rate flag to uint64
Oct 1, 2024
93c3220
try fix
Oct 1, 2024
eeb46ea
fix cmd flag reading
Oct 1, 2024
55221f6
adjusting uint64
ranlavanet Oct 2, 2024
e81afb9
fix redundent nil check in provider load manager
Oct 3, 2024
56c2b3f
Merge branch 'prt-add-provider-relay-load-trailer' of github.com:lava…
Oct 3, 2024
725f40a
fix providerLoadManager per chain creation
Oct 3, 2024
6da0e99
rename and fix instance passing unnecessarily
ranlavanet Oct 3, 2024
9458d6c
fixed chainlib common formatting
Oct 6, 2024
44a5e5c
fix provider load manager comments
Oct 6, 2024
03e1b17
fix e2e tests
Oct 6, 2024
c4bc4ec
fix pr - unite add relay load and set trailer
Oct 6, 2024
2bd9032
fix common.go provider load header
Oct 6, 2024
a44f0ab
fix edge case of getProviderLoad
Oct 6, 2024
50dab3f
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 7, 2024
c88aee2
fix command flag description
Oct 8, 2024
30efbf1
fix command flag description
Oct 8, 2024
810db13
add metric for load rate
Oct 8, 2024
82f3020
fix division to be float and not uint
Oct 8, 2024
14d8c68
roll back init lava only with node two consumers
Oct 8, 2024
2d61289
fix load metric
Oct 8, 2024
280074b
merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 9, 2024
97f72ec
merge main
Oct 9, 2024
454db08
Update protocol/chainlib/common.go
Oct 9, 2024
78ed595
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 9, 2024
63a0e89
Merge branch 'prt-add-provider-relay-load-trailer' of github.com:lava…
Oct 9, 2024
d4c7258
fix load calculation
ranlavanet Oct 10, 2024
273a32a
tidy code
ranlavanet Oct 10, 2024
6fb7276
changing rate limit to 1k
ranlavanet Oct 10, 2024
272e676
fix bug
ranlavanet Oct 10, 2024
42486ac
fix pr
ranlavanet Oct 10, 2024
9484f9f
Merge branch 'main' into prt-add-provider-relay-load-trailer
omerlavanet Oct 15, 2024
ecd416c
Merge branch 'main' into prt-add-provider-relay-load-trailer
ranlavanet Oct 27, 2024
974dbcb
v4
ranlavanet Oct 27, 2024
258578f
Merge branch 'main' into prt-add-provider-relay-load-trailer
ranlavanet Oct 27, 2024
feeb373
fix pr
ranlavanet Oct 27, 2024
51894ce
fix
ranlavanet Oct 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
relayMsgLogMaxChars = 200
RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash"
RPCProviderNodeExtension = "Lava-Provider-Node-Extension"
RpcProviderLoadRateHeader = "Lava-provider-load-rate"
shleikes marked this conversation as resolved.
Show resolved Hide resolved
RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id"
WebSocketExtension = "websocket"
)
Expand Down
1 change: 1 addition & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
// websocket flags
RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection"
BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded"
RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc
chainTracker.StartAndServe(ctx)
reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser)
mockReliabilityManager := NewMockReliabilityManager(reliabilityManager)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, 0)
listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health")
err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint)
require.NoError(t, err)
Expand Down
21 changes: 13 additions & 8 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,17 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch
chainMessage.SetForceCacheRefresh(ok)
}

func (rpccs *RPCConsumerServer) getMetadataFromRelayTrailer(metadataHeader string, relayResult *common.RelayResult) {
trailerValue := relayResult.ProviderTrailer.Get(metadataHeader)
if len(trailerValue) > 0 {
extensionMD := pairingtypes.Metadata{
Name: metadataHeader,
Value: trailerValue[0],
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}
}

func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, protocolMessage chainlib.ProtocolMessage, apiName string) {
if relayResult == nil {
return
Expand Down Expand Up @@ -1332,14 +1343,8 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
}

// fetch trailer information from the provider by using the provider trailer field.
providerNodeExtensions := relayResult.ProviderTrailer.Get(chainlib.RPCProviderNodeExtension)
if len(providerNodeExtensions) > 0 {
extensionMD := pairingtypes.Metadata{
Name: chainlib.RPCProviderNodeExtension,
Value: providerNodeExtensions[0],
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}
rpccs.getMetadataFromRelayTrailer(chainlib.RPCProviderNodeExtension, relayResult)
rpccs.getMetadataFromRelayTrailer(chainlib.RpcProviderLoadRateHeader, relayResult)

directiveHeaders := protocolMessage.GetDirectiveHeaders()
_, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY]
Expand Down
49 changes: 49 additions & 0 deletions protocol/rpcprovider/provider_load_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package rpcprovider

import (
"strconv"
"sync/atomic"
)

type ProviderLoadManager struct {
rateLimitThreshold atomic.Uint64
activeRequestsPerSecond atomic.Uint64
}

func NewProviderLoadManager(rateLimitThreshold uint64) *ProviderLoadManager {
if rateLimitThreshold == 0 {
return nil
}
loadManager := &ProviderLoadManager{}

loadManager.rateLimitThreshold.Store(rateLimitThreshold)
loadManager.activeRequestsPerSecond.Store(0)
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved

return loadManager
}

func (loadManager *ProviderLoadManager) addRelayCall() {
if loadManager == nil {
return
}
loadManager.activeRequestsPerSecond.Add(1)
}

func (loadManager *ProviderLoadManager) subtractRelayCall() {
if loadManager == nil {
return
}
loadManager.activeRequestsPerSecond.Add(^uint64(0))
shleikes marked this conversation as resolved.
Show resolved Hide resolved
}

func (loadManager *ProviderLoadManager) getProviderLoad() string {
if loadManager == nil {
return ""
}
loadedRateLimitThreshold := loadManager.rateLimitThreshold.Load()
if loadedRateLimitThreshold == 0 {
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
return "0"
}

return strconv.FormatUint(loadManager.activeRequestsPerSecond.Load()/loadedRateLimitThreshold, 10)
}
12 changes: 10 additions & 2 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type rpcProviderStartOptions struct {
healthCheckMetricsOptions *rpcProviderHealthCheckMetricsOptions
staticProvider bool
staticSpecPath string
relayLoadLimit uint64
}

type rpcProviderHealthCheckMetricsOptions struct {
Expand Down Expand Up @@ -141,6 +142,7 @@ type RPCProvider struct {
providerUniqueId string
staticProvider bool
staticSpecPath string
relayLoadLimit uint64
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
}

func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
Expand All @@ -165,6 +167,7 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
rpcp.grpcHealthCheckEndpoint = options.healthCheckMetricsOptions.grpcHealthCheckEndpoint
rpcp.staticProvider = options.staticProvider
rpcp.staticSpecPath = options.staticSpecPath
rpcp.relayLoadLimit = options.relayLoadLimit

// single state tracker
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx)
Expand Down Expand Up @@ -486,7 +489,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey)
}

rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, rpcp.relayLoadLimit)
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
// set up grpc listener
var listener *ProviderListener
func() {
Expand Down Expand Up @@ -717,6 +720,10 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
if stickinessHeaderName != "" {
RPCProviderStickinessHeaderName = stickinessHeaderName
}
relayLoadLimit, err := cmd.Flags().GetUint64(common.RateLimitRequestPerSecondFlag)
if err != nil {
utils.LavaFormatFatal("failed to read relay concurrent load limit flag", err)
}
prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName)
rewardStoragePath := viper.GetString(rewardserver.RewardServerStorageFlagName)
rewardTTL := viper.GetDuration(rewardserver.RewardTTLFlagName)
Expand Down Expand Up @@ -754,6 +761,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
&rpcProviderHealthCheckMetricsOptions,
staticProvider,
offlineSpecPath,
relayLoadLimit,
}

rpcProvider := RPCProvider{}
Expand Down Expand Up @@ -790,7 +798,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec")
cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json")

cmdRPCProvider.Flags().Uint(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited")
common.AddRollingLogConfig(cmdRPCProvider)
return cmdRPCProvider
}
10 changes: 9 additions & 1 deletion protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type RPCProviderServer struct {
providerUniqueId string
StaticProvider bool
providerStateMachine *ProviderStateMachine
providerLoadManager *ProviderLoadManager
}

type ReliabilityManagerInf interface {
Expand Down Expand Up @@ -112,6 +113,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
relaysMonitor *metrics.RelaysMonitor,
providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager,
staticProvider bool,
relayLoadLimit uint64,
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
) {
rpcps.cache = cache
rpcps.chainRouter = chainRouter
Expand All @@ -134,6 +136,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
rpcps.relaysMonitor = relaysMonitor
rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager
rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter)
rpcps.providerLoadManager = NewProviderLoadManager(relayLoadLimit)
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved

rpcps.initRelaysMonitor(ctx)
}
Expand Down Expand Up @@ -180,7 +183,12 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain

// function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver
func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) {
grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId))
// count the number of simultaneous relay calls
rpcps.providerLoadManager.addRelayCall()
defer func() { go rpcps.providerLoadManager.subtractRelayCall() }()
shleikes marked this conversation as resolved.
Show resolved Hide resolved
provideRelayLoad := rpcps.providerLoadManager.getProviderLoad()
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId, chainlib.RpcProviderLoadRateHeader, provideRelayLoad)
grpc.SetTrailer(ctx, trailerMd)
if request.RelayData == nil || request.RelaySession == nil {
return nil, utils.LavaFormatWarning("invalid relay request, internal fields are nil", nil)
}
Expand Down
Loading