From 7b8ea69d08926d41eb00ffcf69bb34584061b431 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Mon, 30 Sep 2024 12:55:59 +0200 Subject: [PATCH 01/47] load rate report in trailer --- protocol/chainlib/common.go | 1 + protocol/common/cobra_common.go | 3 +- protocol/rpcconsumer/rpcconsumer_server.go | 21 ++++++++----- protocol/rpcprovider/provider_load_manager.go | 31 +++++++++++++++++++ protocol/rpcprovider/rpcprovider.go | 16 ++++++++-- protocol/rpcprovider/rpcprovider_server.go | 10 +++++- 6 files changed, 70 insertions(+), 12 deletions(-) create mode 100644 protocol/rpcprovider/provider_load_manager.go diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index e2cfb01f80..7694e841e6 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -30,6 +30,7 @@ const ( relayMsgLogMaxChars = 200 RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash" RPCProviderNodeExtension = "Lava-Provider-Node-Extension" + RpcProviderLoadRateHeader = "provider_load_rate" RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id" WebSocketExtension = "websocket" ) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index fe75c8f31f..967bbdb871 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -40,7 +40,8 @@ const ( SetProviderOptimizerNumberOfTiersToCreate = "set-provider-optimizer-number-of-tiers-to-create" // websocket flags - RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection" + RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection" + MaxProviderConcurrentRelayRequestsFlag = "relay-concurrent-load-limit" ) const ( diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index af11a2d952..564117d86b 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -1260,6 +1260,17 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch chainMessage.SetForceCacheRefresh(ok) } +func (rpccs *RPCConsumerServer) getMetadataFromRelayTrailer(metadataHeader string, relayResult *common.RelayResult) { + trailerValue := relayResult.ProviderTrailer.Get(metadataHeader) + if len(trailerValue) > 0 { + extensionMD := pairingtypes.Metadata{ + Name: metadataHeader, + Value: trailerValue[0], + } + relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD) + } +} + func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, protocolMessage chainlib.ProtocolMessage, apiName string) { if relayResult == nil { return @@ -1332,14 +1343,8 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, } // fetch trailer information from the provider by using the provider trailer field. - providerNodeExtensions := relayResult.ProviderTrailer.Get(chainlib.RPCProviderNodeExtension) - if len(providerNodeExtensions) > 0 { - extensionMD := pairingtypes.Metadata{ - Name: chainlib.RPCProviderNodeExtension, - Value: providerNodeExtensions[0], - } - relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD) - } + rpccs.getMetadataFromRelayTrailer(chainlib.RPCProviderNodeExtension, relayResult) + rpccs.getMetadataFromRelayTrailer(chainlib.RpcProviderLoadRateHeader, relayResult) directiveHeaders := protocolMessage.GetDirectiveHeaders() _, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY] diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go new file mode 100644 index 0000000000..c63f1c9683 --- /dev/null +++ b/protocol/rpcprovider/provider_load_manager.go @@ -0,0 +1,31 @@ +package rpcprovider + +import ( + "sync/atomic" +) + +type ProviderLoadManager struct { + totalSimultaneousRelays int64 + activeRequestsPerSecond int64 +} + +func (loadManager *ProviderLoadManager) addRelayCall() { + atomic.AddInt64(&loadManager.totalSimultaneousRelays, 1) +} + +func (loadManager *ProviderLoadManager) removeRelayCall() { + atomic.AddInt64(&loadManager.totalSimultaneousRelays, -1) +} + +func (loadManager *ProviderLoadManager) getRelayCallCount() int64 { + atomic.LoadInt64(&loadManager.totalSimultaneousRelays) + return loadManager.totalSimultaneousRelays +} + +func (loadManager *ProviderLoadManager) getProviderLoad() float64 { + if loadManager.getRelayCallCount() == 0 { + return 0 + } + + return float64(loadManager.activeRequestsPerSecond / loadManager.getRelayCallCount()) +} diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 9b860b433b..d3f66088e9 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -110,6 +110,7 @@ type rpcProviderStartOptions struct { healthCheckMetricsOptions *rpcProviderHealthCheckMetricsOptions staticProvider bool staticSpecPath string + relayLoadLimit uint } type rpcProviderHealthCheckMetricsOptions struct { @@ -141,6 +142,7 @@ type RPCProvider struct { providerUniqueId string staticProvider bool staticSpecPath string + providerLoadManager *ProviderLoadManager } func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { @@ -165,6 +167,10 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { rpcp.grpcHealthCheckEndpoint = options.healthCheckMetricsOptions.grpcHealthCheckEndpoint rpcp.staticProvider = options.staticProvider rpcp.staticSpecPath = options.staticSpecPath + rpcp.providerLoadManager = &ProviderLoadManager{ + totalSimultaneousRelays: int64(options.relayLoadLimit), + activeRequestsPerSecond: 0, + } // single state tracker lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx) @@ -486,7 +492,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey) } - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, rpcp.providerLoadManager) // set up grpc listener var listener *ProviderListener func() { @@ -717,6 +723,11 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt if stickinessHeaderName != "" { RPCProviderStickinessHeaderName = stickinessHeaderName } + relay_load_limit, err := cmd.Flags().GetUint(common.MaxProviderConcurrentRelayRequestsFlag) + if err != nil { + utils.LavaFormatFatal("failed to read relay concurrent loadl limit flag", err) + } + utils.SetGlobalLoggingLevel(logLevel) prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName) rewardStoragePath := viper.GetString(rewardserver.RewardServerStorageFlagName) rewardTTL := viper.GetDuration(rewardserver.RewardTTLFlagName) @@ -754,6 +765,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt &rpcProviderHealthCheckMetricsOptions, staticProvider, offlineSpecPath, + relay_load_limit, } rpcProvider := RPCProvider{} @@ -790,7 +802,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - + cmdRPCProvider.Flags().Uint(common.MaxProviderConcurrentRelayRequestsFlag, 0, "Simultanius relay load count limit") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 539e728165..32cd415949 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -71,6 +71,7 @@ type RPCProviderServer struct { providerUniqueId string StaticProvider bool providerStateMachine *ProviderStateMachine + providerLoadManager *ProviderLoadManager } type ReliabilityManagerInf interface { @@ -112,6 +113,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( relaysMonitor *metrics.RelaysMonitor, providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager, staticProvider bool, + providerLoadManager *ProviderLoadManager, ) { rpcps.cache = cache rpcps.chainRouter = chainRouter @@ -134,6 +136,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( rpcps.relaysMonitor = relaysMonitor rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter) + rpcps.providerLoadManager = providerLoadManager rpcps.initRelaysMonitor(ctx) } @@ -180,7 +183,12 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain // function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { - grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId)) + // count the number of simultanious relay calls + rpcps.providerLoadManager.addRelayCall() + defer rpcps.providerLoadManager.removeRelayCall() + provider_relay_load := strconv.FormatFloat(rpcps.providerLoadManager.getProviderLoad(), 'f', -1, 64) + trailer_md := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId, chainlib.RpcProviderLoadRateHeader, provider_relay_load) + grpc.SetTrailer(ctx, trailer_md) if request.RelayData == nil || request.RelaySession == nil { return nil, utils.LavaFormatWarning("invalid relay request, internal fields are nil", nil) } From 4f7838a23d3d01b8970d90d9bc8e58eb2de243b2 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Mon, 30 Sep 2024 13:58:17 +0200 Subject: [PATCH 02/47] fix trailer name --- protocol/chainlib/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 7694e841e6..1d80737e3a 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -30,7 +30,7 @@ const ( relayMsgLogMaxChars = 200 RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash" RPCProviderNodeExtension = "Lava-Provider-Node-Extension" - RpcProviderLoadRateHeader = "provider_load_rate" + RpcProviderLoadRateHeader = "provider-load-rate" RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id" WebSocketExtension = "websocket" ) From ea1598b13b2b57cb65eee48240cb82a7cde0d814 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Mon, 30 Sep 2024 15:55:27 +0200 Subject: [PATCH 03/47] fix lint --- protocol/integration/protocol_test.go | 6 +++++- protocol/rpcprovider/provider_load_manager.go | 14 +++++++------- protocol/rpcprovider/rpcprovider.go | 4 ++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index cf3a0412d0..deee9f61dc 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -344,7 +344,11 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc chainTracker.StartAndServe(ctx) reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser) mockReliabilityManager := NewMockReliabilityManager(reliabilityManager) - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false) + providerLoadManager := &rpcprovider.ProviderLoadManager{ + TotalSimultaneousRelays: 0, + ActiveRequestsPerSecond: 0, + } + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, providerLoadManager) listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health") err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) require.NoError(t, err) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index c63f1c9683..07bf4ccdc5 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -5,21 +5,21 @@ import ( ) type ProviderLoadManager struct { - totalSimultaneousRelays int64 - activeRequestsPerSecond int64 + TotalSimultaneousRelays int64 + ActiveRequestsPerSecond int64 } func (loadManager *ProviderLoadManager) addRelayCall() { - atomic.AddInt64(&loadManager.totalSimultaneousRelays, 1) + atomic.AddInt64(&loadManager.ActiveRequestsPerSecond, 1) } func (loadManager *ProviderLoadManager) removeRelayCall() { - atomic.AddInt64(&loadManager.totalSimultaneousRelays, -1) + atomic.AddInt64(&loadManager.ActiveRequestsPerSecond, -1) } func (loadManager *ProviderLoadManager) getRelayCallCount() int64 { - atomic.LoadInt64(&loadManager.totalSimultaneousRelays) - return loadManager.totalSimultaneousRelays + totalRelays := atomic.LoadInt64(&loadManager.TotalSimultaneousRelays) + return totalRelays } func (loadManager *ProviderLoadManager) getProviderLoad() float64 { @@ -27,5 +27,5 @@ func (loadManager *ProviderLoadManager) getProviderLoad() float64 { return 0 } - return float64(loadManager.activeRequestsPerSecond / loadManager.getRelayCallCount()) + return float64(loadManager.getRelayCallCount() / loadManager.TotalSimultaneousRelays) } diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index d3f66088e9..d34f36a0c8 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -168,8 +168,8 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { rpcp.staticProvider = options.staticProvider rpcp.staticSpecPath = options.staticSpecPath rpcp.providerLoadManager = &ProviderLoadManager{ - totalSimultaneousRelays: int64(options.relayLoadLimit), - activeRequestsPerSecond: 0, + TotalSimultaneousRelays: int64(options.relayLoadLimit), + ActiveRequestsPerSecond: 0, } // single state tracker From 04f176273bd5f3d5a4972002923e455492957b7a Mon Sep 17 00:00:00 2001 From: leon mandel Date: Mon, 30 Sep 2024 15:58:54 +0200 Subject: [PATCH 04/47] fix load manager logic --- protocol/rpcprovider/provider_load_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 07bf4ccdc5..5a6cb5ee8e 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -23,7 +23,7 @@ func (loadManager *ProviderLoadManager) getRelayCallCount() int64 { } func (loadManager *ProviderLoadManager) getProviderLoad() float64 { - if loadManager.getRelayCallCount() == 0 { + if loadManager.TotalSimultaneousRelays == 0 { return 0 } From eb2b3453ae9b57d02c0408909482cd34425645f0 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Mon, 30 Sep 2024 16:00:42 +0200 Subject: [PATCH 05/47] fix lint --- protocol/common/cobra_common.go | 2 +- scripts/pre_setups/init_lava_only_with_node.sh | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 5326abbb7d..e7ee3ebc8a 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -42,7 +42,7 @@ const ( // websocket flags RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection" BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded" - MaxProviderConcurrentRelayRequestsFlag = "relay-concurrent-load-limit" + MaxProviderConcurrentRelayRequestsFlag = "relay-concurrent-load-limit" ) const ( diff --git a/scripts/pre_setups/init_lava_only_with_node.sh b/scripts/pre_setups/init_lava_only_with_node.sh index d99ddc4094..6fa5ee7f1c 100755 --- a/scripts/pre_setups/init_lava_only_with_node.sh +++ b/scripts/pre_setups/init_lava_only_with_node.sh @@ -51,7 +51,7 @@ screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --relay-concurrent-load-limit 1 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block @@ -59,5 +59,10 @@ screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 +screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ +127.0.0.1:3370 LAV1 rest 127.0.0.1:3371 LAV1 tendermintrpc 127.0.0.1:3372 LAV1 grpc \ +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 + + echo "--- setting up screens done ---" screen -ls \ No newline at end of file From 2c339351b89f34aeb6e1900134cabd57bfc77f8c Mon Sep 17 00:00:00 2001 From: leon mandel Date: Mon, 30 Sep 2024 16:05:14 +0200 Subject: [PATCH 06/47] fix spelling --- protocol/rpcprovider/rpcprovider_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 32cd415949..89bf86cc86 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -183,7 +183,7 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain // function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { - // count the number of simultanious relay calls + // count the number of simultaneous relay calls rpcps.providerLoadManager.addRelayCall() defer rpcps.providerLoadManager.removeRelayCall() provider_relay_load := strconv.FormatFloat(rpcps.providerLoadManager.getProviderLoad(), 'f', -1, 64) From 46e6fafb72eade15d4ef8160e74c49999286b78c Mon Sep 17 00:00:00 2001 From: leon mandel Date: Mon, 30 Sep 2024 16:05:58 +0200 Subject: [PATCH 07/47] fix logic --- protocol/rpcprovider/provider_load_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 5a6cb5ee8e..a5bbd79f8c 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -18,7 +18,7 @@ func (loadManager *ProviderLoadManager) removeRelayCall() { } func (loadManager *ProviderLoadManager) getRelayCallCount() int64 { - totalRelays := atomic.LoadInt64(&loadManager.TotalSimultaneousRelays) + totalRelays := atomic.LoadInt64(&loadManager.ActiveRequestsPerSecond) return totalRelays } From 1f977aed207183ca7572077bb0bb8c2b5bd36769 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 12:51:45 +0200 Subject: [PATCH 08/47] fixed flag & header names --- protocol/chainlib/common.go | 2 +- protocol/common/cobra_common.go | 2 +- protocol/rpcprovider/rpcprovider.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 1d80737e3a..0ea2a6c97d 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -30,7 +30,7 @@ const ( relayMsgLogMaxChars = 200 RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash" RPCProviderNodeExtension = "Lava-Provider-Node-Extension" - RpcProviderLoadRateHeader = "provider-load-rate" + RpcProviderLoadRateHeader = "Lava-provider-load-rate" RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id" WebSocketExtension = "websocket" ) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index e7ee3ebc8a..e4c545d345 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -42,7 +42,7 @@ const ( // websocket flags RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection" BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded" - MaxProviderConcurrentRelayRequestsFlag = "relay-concurrent-load-limit" + RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second" ) const ( diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index d34f36a0c8..cc19d31101 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -723,7 +723,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt if stickinessHeaderName != "" { RPCProviderStickinessHeaderName = stickinessHeaderName } - relay_load_limit, err := cmd.Flags().GetUint(common.MaxProviderConcurrentRelayRequestsFlag) + relay_load_limit, err := cmd.Flags().GetUint(common.RateLimitRequestPerSecondFlag) if err != nil { utils.LavaFormatFatal("failed to read relay concurrent loadl limit flag", err) } @@ -802,7 +802,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - cmdRPCProvider.Flags().Uint(common.MaxProviderConcurrentRelayRequestsFlag, 0, "Simultanius relay load count limit") + cmdRPCProvider.Flags().Uint(common.RateLimitRequestPerSecondFlag, 0, "Simultanius relay load count limit") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } From 6db7dd33d05f92ed3d3feeb6c7106b7d33d1ccc6 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 13:23:24 +0200 Subject: [PATCH 09/47] fix load provider manager and creation logic --- protocol/rpcprovider/provider_load_manager.go | 44 +++++++++++++------ protocol/rpcprovider/rpcprovider.go | 11 ++--- protocol/rpcprovider/rpcprovider_server.go | 8 ++-- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index a5bbd79f8c..f991385b88 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -1,31 +1,49 @@ package rpcprovider import ( + "strconv" "sync/atomic" ) type ProviderLoadManager struct { - TotalSimultaneousRelays int64 - ActiveRequestsPerSecond int64 + rateLimitThreshold atomic.Uint64 + activeRequestsPerSecond atomic.Uint64 } -func (loadManager *ProviderLoadManager) addRelayCall() { - atomic.AddInt64(&loadManager.ActiveRequestsPerSecond, 1) +func NewProviderLoadManager(rateLimitThreshold uint64) *ProviderLoadManager { + if rateLimitThreshold == 0 { + return nil + } + loadManager := &ProviderLoadManager{} + + loadManager.rateLimitThreshold.Store(rateLimitThreshold) + loadManager.activeRequestsPerSecond.Store(0) + + return loadManager } -func (loadManager *ProviderLoadManager) removeRelayCall() { - atomic.AddInt64(&loadManager.ActiveRequestsPerSecond, -1) +func (loadManager *ProviderLoadManager) addRelayCall() { + if loadManager == nil { + return + } + loadManager.activeRequestsPerSecond.Add(1) } -func (loadManager *ProviderLoadManager) getRelayCallCount() int64 { - totalRelays := atomic.LoadInt64(&loadManager.ActiveRequestsPerSecond) - return totalRelays +func (loadManager *ProviderLoadManager) subtractRelayCall() { + if loadManager == nil { + return + } + loadManager.activeRequestsPerSecond.Add(^uint64(0)) } -func (loadManager *ProviderLoadManager) getProviderLoad() float64 { - if loadManager.TotalSimultaneousRelays == 0 { - return 0 +func (loadManager *ProviderLoadManager) getProviderLoad() string { + if loadManager == nil { + return "0" + } + loadedRateLimitThreshold := loadManager.rateLimitThreshold.Load() + if loadedRateLimitThreshold == 0 { + return "0" } - return float64(loadManager.getRelayCallCount() / loadManager.TotalSimultaneousRelays) + return strconv.FormatUint(loadManager.activeRequestsPerSecond.Load()/loadedRateLimitThreshold, 10) } diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index cc19d31101..0330eb3fe2 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -110,7 +110,7 @@ type rpcProviderStartOptions struct { healthCheckMetricsOptions *rpcProviderHealthCheckMetricsOptions staticProvider bool staticSpecPath string - relayLoadLimit uint + relayLoadLimit uint64 } type rpcProviderHealthCheckMetricsOptions struct { @@ -142,7 +142,7 @@ type RPCProvider struct { providerUniqueId string staticProvider bool staticSpecPath string - providerLoadManager *ProviderLoadManager + relayLoadLimit uint64 } func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { @@ -167,10 +167,7 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { rpcp.grpcHealthCheckEndpoint = options.healthCheckMetricsOptions.grpcHealthCheckEndpoint rpcp.staticProvider = options.staticProvider rpcp.staticSpecPath = options.staticSpecPath - rpcp.providerLoadManager = &ProviderLoadManager{ - TotalSimultaneousRelays: int64(options.relayLoadLimit), - ActiveRequestsPerSecond: 0, - } + rpcp.relayLoadLimit = options.relayLoadLimit // single state tracker lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx) @@ -492,7 +489,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey) } - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, rpcp.providerLoadManager) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, rpcp.relayLoadLimit) // set up grpc listener var listener *ProviderListener func() { diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 89bf86cc86..29952f9002 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -113,7 +113,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( relaysMonitor *metrics.RelaysMonitor, providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager, staticProvider bool, - providerLoadManager *ProviderLoadManager, + relayLoadLimit uint64, ) { rpcps.cache = cache rpcps.chainRouter = chainRouter @@ -136,7 +136,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( rpcps.relaysMonitor = relaysMonitor rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter) - rpcps.providerLoadManager = providerLoadManager + rpcps.providerLoadManager = NewProviderLoadManager(relayLoadLimit) rpcps.initRelaysMonitor(ctx) } @@ -185,8 +185,8 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { // count the number of simultaneous relay calls rpcps.providerLoadManager.addRelayCall() - defer rpcps.providerLoadManager.removeRelayCall() - provider_relay_load := strconv.FormatFloat(rpcps.providerLoadManager.getProviderLoad(), 'f', -1, 64) + defer rpcps.providerLoadManager.subtractRelayCall() + provider_relay_load := rpcps.providerLoadManager.getProviderLoad() trailer_md := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId, chainlib.RpcProviderLoadRateHeader, provider_relay_load) grpc.SetTrailer(ctx, trailer_md) if request.RelayData == nil || request.RelaySession == nil { From b9e199b869ba54e057ad199b17511bca54a5e9c1 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 13:25:09 +0200 Subject: [PATCH 10/47] fix logs for relay load rate --- protocol/rpcprovider/rpcprovider.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 0330eb3fe2..93eb6742c6 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -720,11 +720,10 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt if stickinessHeaderName != "" { RPCProviderStickinessHeaderName = stickinessHeaderName } - relay_load_limit, err := cmd.Flags().GetUint(common.RateLimitRequestPerSecondFlag) + relayLoadLimit, err := cmd.Flags().GetUint64(common.RateLimitRequestPerSecondFlag) if err != nil { utils.LavaFormatFatal("failed to read relay concurrent loadl limit flag", err) } - utils.SetGlobalLoggingLevel(logLevel) prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName) rewardStoragePath := viper.GetString(rewardserver.RewardServerStorageFlagName) rewardTTL := viper.GetDuration(rewardserver.RewardTTLFlagName) @@ -762,7 +761,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt &rpcProviderHealthCheckMetricsOptions, staticProvider, offlineSpecPath, - relay_load_limit, + relayLoadLimit, } rpcProvider := RPCProvider{} @@ -799,7 +798,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - cmdRPCProvider.Flags().Uint(common.RateLimitRequestPerSecondFlag, 0, "Simultanius relay load count limit") + cmdRPCProvider.Flags().Uint(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } From 8b8a05acd54c009763e0aa8de852218165b398f4 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 13:29:29 +0200 Subject: [PATCH 11/47] fix rpcprovider server relay load handling --- protocol/rpcprovider/provider_load_manager.go | 2 +- protocol/rpcprovider/rpcprovider_server.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index f991385b88..d91df522af 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -38,7 +38,7 @@ func (loadManager *ProviderLoadManager) subtractRelayCall() { func (loadManager *ProviderLoadManager) getProviderLoad() string { if loadManager == nil { - return "0" + return "" } loadedRateLimitThreshold := loadManager.rateLimitThreshold.Load() if loadedRateLimitThreshold == 0 { diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 29952f9002..c118f38ee2 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -185,10 +185,10 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { // count the number of simultaneous relay calls rpcps.providerLoadManager.addRelayCall() - defer rpcps.providerLoadManager.subtractRelayCall() - provider_relay_load := rpcps.providerLoadManager.getProviderLoad() - trailer_md := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId, chainlib.RpcProviderLoadRateHeader, provider_relay_load) - grpc.SetTrailer(ctx, trailer_md) + defer func() { go rpcps.providerLoadManager.subtractRelayCall() }() + provideRelayLoad := rpcps.providerLoadManager.getProviderLoad() + trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId, chainlib.RpcProviderLoadRateHeader, provideRelayLoad) + grpc.SetTrailer(ctx, trailerMd) if request.RelayData == nil || request.RelaySession == nil { return nil, utils.LavaFormatWarning("invalid relay request, internal fields are nil", nil) } From 5e3b7a4cc36161970d87916541c9d7db8ac058e5 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 13:31:45 +0200 Subject: [PATCH 12/47] fix tests --- protocol/integration/protocol_test.go | 6 +----- scripts/pre_setups/init_lava_only_with_node.sh | 9 ++------- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index deee9f61dc..c006fd5abd 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -344,11 +344,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc chainTracker.StartAndServe(ctx) reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser) mockReliabilityManager := NewMockReliabilityManager(reliabilityManager) - providerLoadManager := &rpcprovider.ProviderLoadManager{ - TotalSimultaneousRelays: 0, - ActiveRequestsPerSecond: 0, - } - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, providerLoadManager) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, 0) listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health") err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) require.NoError(t, err) diff --git a/scripts/pre_setups/init_lava_only_with_node.sh b/scripts/pre_setups/init_lava_only_with_node.sh index 6fa5ee7f1c..ea3dc23ea9 100755 --- a/scripts/pre_setups/init_lava_only_with_node.sh +++ b/scripts/pre_setups/init_lava_only_with_node.sh @@ -51,7 +51,7 @@ screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --relay-concurrent-load-limit 1 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block @@ -59,10 +59,5 @@ screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 -screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ -127.0.0.1:3370 LAV1 rest 127.0.0.1:3371 LAV1 tendermintrpc 127.0.0.1:3372 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 - - echo "--- setting up screens done ---" -screen -ls \ No newline at end of file +screen -ls From 199ff2c86ee676f46be5ea1b9928fb0b3a1d695f Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 13:37:27 +0200 Subject: [PATCH 13/47] fix typo --- protocol/rpcprovider/rpcprovider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 93eb6742c6..94a2816d1b 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -722,7 +722,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt } relayLoadLimit, err := cmd.Flags().GetUint64(common.RateLimitRequestPerSecondFlag) if err != nil { - utils.LavaFormatFatal("failed to read relay concurrent loadl limit flag", err) + utils.LavaFormatFatal("failed to read relay concurrent load limit flag", err) } prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName) rewardStoragePath := viper.GetString(rewardserver.RewardServerStorageFlagName) From 9faf8ef9a5b34bc970bed505ab6ce31b2e384eae Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 13:46:04 +0200 Subject: [PATCH 14/47] fix init lava script --- scripts/pre_setups/init_lava_only_with_node.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/pre_setups/init_lava_only_with_node.sh b/scripts/pre_setups/init_lava_only_with_node.sh index ea3dc23ea9..d99ddc4094 100755 --- a/scripts/pre_setups/init_lava_only_with_node.sh +++ b/scripts/pre_setups/init_lava_only_with_node.sh @@ -60,4 +60,4 @@ screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 echo "--- setting up screens done ---" -screen -ls +screen -ls \ No newline at end of file From 56191f626d3c96ac8a2dc3ec7699922f6bf83e94 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 14:59:14 +0200 Subject: [PATCH 15/47] fix provider load manager --- protocol/rpcprovider/provider_load_manager.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index d91df522af..6395ddf037 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -17,7 +17,6 @@ func NewProviderLoadManager(rateLimitThreshold uint64) *ProviderLoadManager { loadManager := &ProviderLoadManager{} loadManager.rateLimitThreshold.Store(rateLimitThreshold) - loadManager.activeRequestsPerSecond.Store(0) return loadManager } @@ -40,10 +39,6 @@ func (loadManager *ProviderLoadManager) getProviderLoad() string { if loadManager == nil { return "" } - loadedRateLimitThreshold := loadManager.rateLimitThreshold.Load() - if loadedRateLimitThreshold == 0 { - return "0" - } - return strconv.FormatUint(loadManager.activeRequestsPerSecond.Load()/loadedRateLimitThreshold, 10) + return strconv.FormatUint(loadManager.activeRequestsPerSecond.Load()/loadManager.rateLimitThreshold.Load(), 10) } From 9eabb5ac988c7c12d533a62a1f2bfa88eeb9425a Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 15:39:55 +0200 Subject: [PATCH 16/47] fix provider server and load manager --- protocol/rpcprovider/provider_load_manager.go | 15 +++++++++++++++ protocol/rpcprovider/rpcprovider.go | 4 ++-- protocol/rpcprovider/rpcprovider_server.go | 8 ++++---- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 6395ddf037..5563b7b936 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -1,8 +1,13 @@ package rpcprovider import ( + "context" "strconv" "sync/atomic" + + "github.com/lavanet/lava/v3/protocol/chainlib" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) type ProviderLoadManager struct { @@ -42,3 +47,13 @@ func (loadManager *ProviderLoadManager) getProviderLoad() string { return strconv.FormatUint(loadManager.activeRequestsPerSecond.Load()/loadManager.rateLimitThreshold.Load(), 10) } + +func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) { + provideRelayLoad := loadManager.getProviderLoad() + if provideRelayLoad == "" { + return + } + + trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, provideRelayLoad) + grpc.SetTrailer(ctx, trailerMd) +} diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 94a2816d1b..ab27308fc5 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -488,8 +488,8 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint)) providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey) } - - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, rpcp.relayLoadLimit) + relayLoadProvider := NewProviderLoadManager(rpcp.relayLoadLimit) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, relayLoadProvider) // set up grpc listener var listener *ProviderListener func() { diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index c118f38ee2..258f6fd28e 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -113,7 +113,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( relaysMonitor *metrics.RelaysMonitor, providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager, staticProvider bool, - relayLoadLimit uint64, + providerLoadManager *ProviderLoadManager, ) { rpcps.cache = cache rpcps.chainRouter = chainRouter @@ -136,7 +136,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( rpcps.relaysMonitor = relaysMonitor rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter) - rpcps.providerLoadManager = NewProviderLoadManager(relayLoadLimit) + rpcps.providerLoadManager = providerLoadManager rpcps.initRelaysMonitor(ctx) } @@ -186,8 +186,8 @@ func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes // count the number of simultaneous relay calls rpcps.providerLoadManager.addRelayCall() defer func() { go rpcps.providerLoadManager.subtractRelayCall() }() - provideRelayLoad := rpcps.providerLoadManager.getProviderLoad() - trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId, chainlib.RpcProviderLoadRateHeader, provideRelayLoad) + rpcps.providerLoadManager.applyProviderLoadMetadataToContextTrailer(ctx) + trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId) grpc.SetTrailer(ctx, trailerMd) if request.RelayData == nil || request.RelaySession == nil { return nil, utils.LavaFormatWarning("invalid relay request, internal fields are nil", nil) From ffdb9869589a5197e8a63a27940314650d11bdcd Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 15:43:45 +0200 Subject: [PATCH 17/47] fix lint - fix protocol test --- protocol/integration/protocol_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index c006fd5abd..accb844569 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -344,7 +344,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc chainTracker.StartAndServe(ctx) reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser) mockReliabilityManager := NewMockReliabilityManager(reliabilityManager) - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, 0) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil) listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health") err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) require.NoError(t, err) From 19cc45470b2099c015de70b56632176023b2703c Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 16:09:17 +0200 Subject: [PATCH 18/47] fix provider load manager applyProviderLoadMetadataToContextTrailer --- protocol/rpcprovider/provider_load_manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 5563b7b936..712883f5dd 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -49,6 +49,10 @@ func (loadManager *ProviderLoadManager) getProviderLoad() string { } func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) { + if loadManager == nil { + return + } + provideRelayLoad := loadManager.getProviderLoad() if provideRelayLoad == "" { return From 5a29efda7808a0acaac112842467b70e819f1a23 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 17:53:21 +0200 Subject: [PATCH 19/47] change cmdRPCProvider load rate flag to uint64 --- protocol/rpcprovider/rpcprovider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index ab27308fc5..17b939f1bd 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -798,7 +798,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - cmdRPCProvider.Flags().Uint(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited") + cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } From 93c322031d71d2137c28dc201cc33a4c433819df Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 18:18:59 +0200 Subject: [PATCH 20/47] try fix --- protocol/rpcprovider/rpcprovider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 17b939f1bd..5036883a25 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -761,7 +761,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt &rpcProviderHealthCheckMetricsOptions, staticProvider, offlineSpecPath, - relayLoadLimit, + uint64(relayLoadLimit), } rpcProvider := RPCProvider{} From eeb46eae0ad49307a63798b3a644708f74876c7d Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 1 Oct 2024 18:32:14 +0200 Subject: [PATCH 21/47] fix cmd flag reading --- protocol/rpcprovider/rpcprovider.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 5036883a25..8bb0366153 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -720,7 +720,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt if stickinessHeaderName != "" { RPCProviderStickinessHeaderName = stickinessHeaderName } - relayLoadLimit, err := cmd.Flags().GetUint64(common.RateLimitRequestPerSecondFlag) + relayLoadLimit, err := cmd.Flags().GetUint(common.RateLimitRequestPerSecondFlag) if err != nil { utils.LavaFormatFatal("failed to read relay concurrent load limit flag", err) } @@ -798,7 +798,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited") + cmdRPCProvider.Flags().Uint(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } From 55221f68b6ee776ffa0aa21e009b99be8fe63836 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 2 Oct 2024 11:33:08 +0200 Subject: [PATCH 22/47] adjusting uint64 --- protocol/rpcprovider/rpcprovider.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 8bb0366153..2a2b0dc705 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -720,10 +720,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt if stickinessHeaderName != "" { RPCProviderStickinessHeaderName = stickinessHeaderName } - relayLoadLimit, err := cmd.Flags().GetUint(common.RateLimitRequestPerSecondFlag) - if err != nil { - utils.LavaFormatFatal("failed to read relay concurrent load limit flag", err) - } + relayLoadLimit := viper.GetUint64(common.RateLimitRequestPerSecondFlag) prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName) rewardStoragePath := viper.GetString(rewardserver.RewardServerStorageFlagName) rewardTTL := viper.GetDuration(rewardserver.RewardTTLFlagName) @@ -761,7 +758,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt &rpcProviderHealthCheckMetricsOptions, staticProvider, offlineSpecPath, - uint64(relayLoadLimit), + relayLoadLimit, } rpcProvider := RPCProvider{} @@ -798,7 +795,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - cmdRPCProvider.Flags().Uint(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited") + cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } From e81afb9004709ba9c2f23e147b30b18de21c4873 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Thu, 3 Oct 2024 14:29:14 +0200 Subject: [PATCH 23/47] fix redundent nil check in provider load manager --- protocol/rpcprovider/provider_load_manager.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 712883f5dd..5563b7b936 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -49,10 +49,6 @@ func (loadManager *ProviderLoadManager) getProviderLoad() string { } func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) { - if loadManager == nil { - return - } - provideRelayLoad := loadManager.getProviderLoad() if provideRelayLoad == "" { return From 725f40a010e5e88abc7bbdadc1194e0536ab3874 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Thu, 3 Oct 2024 15:38:07 +0200 Subject: [PATCH 24/47] fix providerLoadManager per chain creation --- protocol/rpcprovider/rpcprovider.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 2a2b0dc705..68df8e346f 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -310,21 +310,25 @@ func (rpcp *RPCProvider) SetupProviderEndpoints(rpcProviderEndpoints []*lavasess wg.Add(parallelJobs) disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs) // validate static spec configuration is used only on a single chain setup. - chainIds := make(map[string]struct{}) + chainIds := make(map[string]*ProviderLoadManager) for _, rpcProviderEndpoint := range rpcProviderEndpoints { - chainIds[rpcProviderEndpoint.ChainID] = struct{}{} - setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) { + providerLoadManager, keyExists := chainIds[rpcProviderEndpoint.ChainID] + if !keyExists { + providerLoadManager = NewProviderLoadManager(rpcp.relayLoadLimit) + chainIds[rpcProviderEndpoint.ChainID] = providerLoadManager + } + setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator, providerLoadManager *ProviderLoadManager) { defer wg.Done() - err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator) + err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator, providerLoadManager) if err != nil { rpcp.providerMetricsManager.SetDisabledChain(rpcProviderEndpoint.ChainID, rpcProviderEndpoint.ApiInterface) disabledEndpoints <- rpcProviderEndpoint } } if parallel { - go setupEndpoint(rpcProviderEndpoint, specValidator) + go setupEndpoint(rpcProviderEndpoint, specValidator, providerLoadManager) } else { - setupEndpoint(rpcProviderEndpoint, specValidator) + setupEndpoint(rpcProviderEndpoint, specValidator, providerLoadManager) } } wg.Wait() @@ -344,7 +348,7 @@ func GetAllAddonsAndExtensionsFromNodeUrlSlice(nodeUrls []common.NodeUrl) *Provi return policy } -func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) error { +func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator, providerLoadManager *ProviderLoadManager) error { err := rpcProviderEndpoint.Validate() if err != nil { return utils.LavaFormatError("[PANIC] panic severity critical error, aborting support for chain api due to invalid node url definition, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()}) @@ -488,8 +492,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint)) providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey) } - relayLoadProvider := NewProviderLoadManager(rpcp.relayLoadLimit) - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, relayLoadProvider) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, providerLoadManager) // set up grpc listener var listener *ProviderListener func() { From 6da0e99dcd15308e8b572d5e2a1e64bf327d944a Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 3 Oct 2024 17:59:40 +0200 Subject: [PATCH 25/47] rename and fix instance passing unnecessarily --- protocol/rpcprovider/rpcprovider.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 68df8e346f..c02575c013 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -310,14 +310,14 @@ func (rpcp *RPCProvider) SetupProviderEndpoints(rpcProviderEndpoints []*lavasess wg.Add(parallelJobs) disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs) // validate static spec configuration is used only on a single chain setup. - chainIds := make(map[string]*ProviderLoadManager) + providerLoadManagersPerChain := make(map[string]*ProviderLoadManager) for _, rpcProviderEndpoint := range rpcProviderEndpoints { - providerLoadManager, keyExists := chainIds[rpcProviderEndpoint.ChainID] + providerLoadManager, keyExists := providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] if !keyExists { providerLoadManager = NewProviderLoadManager(rpcp.relayLoadLimit) - chainIds[rpcProviderEndpoint.ChainID] = providerLoadManager + providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] = providerLoadManager } - setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator, providerLoadManager *ProviderLoadManager) { + setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) { defer wg.Done() err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator, providerLoadManager) if err != nil { @@ -326,9 +326,9 @@ func (rpcp *RPCProvider) SetupProviderEndpoints(rpcProviderEndpoints []*lavasess } } if parallel { - go setupEndpoint(rpcProviderEndpoint, specValidator, providerLoadManager) + go setupEndpoint(rpcProviderEndpoint, specValidator) } else { - setupEndpoint(rpcProviderEndpoint, specValidator, providerLoadManager) + setupEndpoint(rpcProviderEndpoint, specValidator) } } wg.Wait() From 9458d6c71c4809897bbaecf7b11742151c064e2a Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 6 Oct 2024 13:17:45 +0200 Subject: [PATCH 26/47] fixed chainlib common formatting --- protocol/chainlib/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 0ea2a6c97d..80291cf608 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -30,7 +30,7 @@ const ( relayMsgLogMaxChars = 200 RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash" RPCProviderNodeExtension = "Lava-Provider-Node-Extension" - RpcProviderLoadRateHeader = "Lava-provider-load-rate" + RpcProviderLoadRateHeader = "Lava-Provider-load-rate" RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id" WebSocketExtension = "websocket" ) From 44a5e5c1624b10f6a2bdc7dffd3b354a8203b0be Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 6 Oct 2024 13:31:38 +0200 Subject: [PATCH 27/47] fix provider load manager comments --- protocol/rpcprovider/provider_load_manager.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 5563b7b936..c3abe6c141 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -37,15 +37,16 @@ func (loadManager *ProviderLoadManager) subtractRelayCall() { if loadManager == nil { return } - loadManager.activeRequestsPerSecond.Add(^uint64(0)) + loadManager.activeRequestsPerSecond.Add(uint64(0)) } func (loadManager *ProviderLoadManager) getProviderLoad() string { if loadManager == nil { return "" } - - return strconv.FormatUint(loadManager.activeRequestsPerSecond.Load()/loadManager.rateLimitThreshold.Load(), 10) + activeRequests := loadManager.activeRequestsPerSecond.Load() + rateLimitThreshold := loadManager.rateLimitThreshold.Load() + return strconv.FormatUint(activeRequests/rateLimitThreshold, 10) } func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) { From 03e1b1783a89794ed146df07c792e9b38c6673b7 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 6 Oct 2024 13:54:28 +0200 Subject: [PATCH 28/47] fix e2e tests --- protocol/rpcprovider/provider_load_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index c3abe6c141..4d65959f50 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -37,7 +37,7 @@ func (loadManager *ProviderLoadManager) subtractRelayCall() { if loadManager == nil { return } - loadManager.activeRequestsPerSecond.Add(uint64(0)) + loadManager.activeRequestsPerSecond.Add(^uint64(0)) } func (loadManager *ProviderLoadManager) getProviderLoad() string { From c4bc4ec9807b7a5b5613472e5303939a48878295 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 6 Oct 2024 14:01:00 +0200 Subject: [PATCH 29/47] fix pr - unite add relay load and set trailer --- protocol/chainlib/common.go | 2 +- protocol/rpcprovider/provider_load_manager.go | 5 +++++ protocol/rpcprovider/rpcprovider_server.go | 3 +-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 80291cf608..e580c9f440 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -30,7 +30,7 @@ const ( relayMsgLogMaxChars = 200 RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash" RPCProviderNodeExtension = "Lava-Provider-Node-Extension" - RpcProviderLoadRateHeader = "Lava-Provider-load-rate" + RpcProviderLoadRateHeader = "Lava-Provider-Load-Rate" RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id" WebSocketExtension = "websocket" ) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 4d65959f50..36346a18c2 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -58,3 +58,8 @@ func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTraile trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, provideRelayLoad) grpc.SetTrailer(ctx, trailerMd) } + +func (loadManager *ProviderLoadManager) addAndSetRelayLoadToContextTrailer(ctx context.Context) { + loadManager.addRelayCall() + loadManager.applyProviderLoadMetadataToContextTrailer(ctx) +} diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 258f6fd28e..d4d4621b9a 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -184,9 +184,8 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain // function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { // count the number of simultaneous relay calls - rpcps.providerLoadManager.addRelayCall() + rpcps.providerLoadManager.addAndSetRelayLoadToContextTrailer(ctx) defer func() { go rpcps.providerLoadManager.subtractRelayCall() }() - rpcps.providerLoadManager.applyProviderLoadMetadataToContextTrailer(ctx) trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId) grpc.SetTrailer(ctx, trailerMd) if request.RelayData == nil || request.RelaySession == nil { From 2bd9032c10e9fec018a8ac7179a6ce22d7ea7863 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 6 Oct 2024 14:05:33 +0200 Subject: [PATCH 30/47] fix common.go provider load header --- protocol/chainlib/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index e580c9f440..80291cf608 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -30,7 +30,7 @@ const ( relayMsgLogMaxChars = 200 RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash" RPCProviderNodeExtension = "Lava-Provider-Node-Extension" - RpcProviderLoadRateHeader = "Lava-Provider-Load-Rate" + RpcProviderLoadRateHeader = "Lava-Provider-load-rate" RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id" WebSocketExtension = "websocket" ) From a44f0abca9af27fcdadd4c139581c151cf8374a7 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Sun, 6 Oct 2024 14:09:09 +0200 Subject: [PATCH 31/47] fix edge case of getProviderLoad --- protocol/rpcprovider/provider_load_manager.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 36346a18c2..9ed20a89c1 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -44,8 +44,11 @@ func (loadManager *ProviderLoadManager) getProviderLoad() string { if loadManager == nil { return "" } - activeRequests := loadManager.activeRequestsPerSecond.Load() rateLimitThreshold := loadManager.rateLimitThreshold.Load() + if rateLimitThreshold == 0 { + return "" + } + activeRequests := loadManager.activeRequestsPerSecond.Load() return strconv.FormatUint(activeRequests/rateLimitThreshold, 10) } From c88aee2c34028cb1b02cafcbe8cdf2daade43c3d Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 8 Oct 2024 11:23:11 +0200 Subject: [PATCH 32/47] fix command flag description --- protocol/rpcprovider/rpcprovider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index c02575c013..b33ea5b8ec 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -798,7 +798,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "rate limit requests per second - per chain - default unlimited") + cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } From 30efbf14e2d923799c17efe793e7eb17b87c1b33 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 8 Oct 2024 11:23:46 +0200 Subject: [PATCH 33/47] fix command flag description --- protocol/rpcprovider/rpcprovider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index b33ea5b8ec..77fc70a304 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -798,7 +798,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate") + cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } From 810db1333787e581940cf5314e27bd9aa1f5cab5 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 8 Oct 2024 15:17:39 +0200 Subject: [PATCH 34/47] add metric for load rate --- protocol/metrics/metrics_provider.go | 7 +++ protocol/metrics/metrics_provider_manager.go | 10 +++- protocol/rpcprovider/provider_load_manager.go | 15 +++-- protocol/rpcprovider/rpcprovider.go | 59 ++++++++++--------- protocol/rpcprovider/rpcprovider_server.go | 10 +++- .../init_lava_only_with_node_two_consumers.sh | 2 +- 6 files changed, 68 insertions(+), 35 deletions(-) diff --git a/protocol/metrics/metrics_provider.go b/protocol/metrics/metrics_provider.go index b5c3f0761b..e1c69c3610 100644 --- a/protocol/metrics/metrics_provider.go +++ b/protocol/metrics/metrics_provider.go @@ -22,6 +22,7 @@ type ProviderMetrics struct { totalRelaysServicedMetric *prometheus.CounterVec totalErroredMetric *prometheus.CounterVec consumerQoSMetric *prometheus.GaugeVec + loadRateMetric *prometheus.GaugeVec } func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pairingtypes.QualityOfServiceReport) { @@ -49,6 +50,10 @@ func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pair } } +func (pm *ProviderMetrics) SetLoadRate(loatRate float64) { + pm.loadRateMetric.WithLabelValues(pm.specID).Set(loatRate) +} + func (pm *ProviderMetrics) AddPayment(cu uint64) { if pm == nil { return @@ -72,6 +77,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom totalRelaysServicedMetric *prometheus.CounterVec, totalErroredMetric *prometheus.CounterVec, consumerQoSMetric *prometheus.GaugeVec, + loadRateMetric *prometheus.GaugeVec, ) *ProviderMetrics { pm := &ProviderMetrics{ specID: specID, @@ -82,6 +88,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom totalRelaysServicedMetric: totalRelaysServicedMetric, totalErroredMetric: totalErroredMetric, consumerQoSMetric: consumerQoSMetric, + loadRateMetric: loadRateMetric, } return pm } diff --git a/protocol/metrics/metrics_provider_manager.go b/protocol/metrics/metrics_provider_manager.go index b71b90e664..584346d750 100644 --- a/protocol/metrics/metrics_provider_manager.go +++ b/protocol/metrics/metrics_provider_manager.go @@ -41,6 +41,7 @@ type ProviderMetricsManager struct { endpointsHealthChecksOk uint64 relaysMonitors map[string]*RelaysMonitor relaysMonitorsLock sync.RWMutex + loadRateMetric *prometheus.GaugeVec } func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { @@ -107,6 +108,11 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { Help: "The total number of get latest block queries that succeeded by chainfetcher", }, []string{"spec"}) + loadRateMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_provider_load_rate", + Help: "The load rate according to the load rate limit - Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.", + }, []string{"spec"}) + fetchBlockSuccessMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "lava_provider_fetch_block_success", Help: "The total number of get specific block queries that succeeded by chainfetcher", @@ -141,6 +147,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { prometheus.MustRegister(virtualEpochMetric) prometheus.MustRegister(endpointsHealthChecksOkMetric) prometheus.MustRegister(protocolVersionMetric) + prometheus.MustRegister(loadRateMetric) providerMetricsManager := &ProviderMetricsManager{ providerMetrics: map[string]*ProviderMetrics{}, @@ -161,6 +168,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { endpointsHealthChecksOk: 1, protocolVersionMetric: protocolVersionMetric, relaysMonitors: map[string]*RelaysMonitor{}, + loadRateMetric: loadRateMetric, } http.Handle("/metrics", promhttp.Handler()) @@ -209,7 +217,7 @@ func (pme *ProviderMetricsManager) AddProviderMetrics(specID, apiInterface strin } if pme.getProviderMetric(specID, apiInterface) == nil { - providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric) + providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric, pme.loadRateMetric) pme.setProviderMetric(providerMetric) endpoint := fmt.Sprintf("/metrics/%s/%s/health", specID, apiInterface) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 9ed20a89c1..a69364a4a5 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -49,20 +49,25 @@ func (loadManager *ProviderLoadManager) getProviderLoad() string { return "" } activeRequests := loadManager.activeRequestsPerSecond.Load() - return strconv.FormatUint(activeRequests/rateLimitThreshold, 10) + return strconv.FormatFloat(float64(activeRequests/rateLimitThreshold), 'f', -1, 64) } -func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) { +func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) bool { provideRelayLoad := loadManager.getProviderLoad() if provideRelayLoad == "" { - return + return false } trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, provideRelayLoad) grpc.SetTrailer(ctx, trailerMd) + return true } -func (loadManager *ProviderLoadManager) addAndSetRelayLoadToContextTrailer(ctx context.Context) { +func (loadManager *ProviderLoadManager) addAndSetRelayLoadToContextTrailer(ctx context.Context) bool { loadManager.addRelayCall() - loadManager.applyProviderLoadMetadataToContextTrailer(ctx) + return loadManager.applyProviderLoadMetadataToContextTrailer(ctx) +} + +func (loadManager *ProviderLoadManager) getActiveRequestsPerSecond() uint64 { + return loadManager.activeRequestsPerSecond.Load() } diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 77fc70a304..9b5419a88d 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -124,25 +124,26 @@ type RPCProvider struct { rpcProviderListeners map[string]*ProviderListener lock sync.Mutex // all of the following members need to be concurrency proof - providerMetricsManager *metrics.ProviderMetricsManager - rewardServer *rewardserver.RewardServer - privKey *btcec.PrivateKey - lavaChainID string - addr sdk.AccAddress - blockMemorySize uint64 - chainMutexes map[string]*sync.Mutex - parallelConnections uint - cache *performance.Cache - shardID uint // shardID is a flag that allows setting up multiple provider databases of the same chain - chainTrackers *common.SafeSyncMap[string, *chaintracker.ChainTracker] - relaysMonitorAggregator *metrics.RelaysMonitorAggregator - relaysHealthCheckEnabled bool - relaysHealthCheckInterval time.Duration - grpcHealthCheckEndpoint string - providerUniqueId string - staticProvider bool - staticSpecPath string - relayLoadLimit uint64 + providerMetricsManager *metrics.ProviderMetricsManager + rewardServer *rewardserver.RewardServer + privKey *btcec.PrivateKey + lavaChainID string + addr sdk.AccAddress + blockMemorySize uint64 + chainMutexes map[string]*sync.Mutex + parallelConnections uint + cache *performance.Cache + shardID uint // shardID is a flag that allows setting up multiple provider databases of the same chain + chainTrackers *common.SafeSyncMap[string, *chaintracker.ChainTracker] + relaysMonitorAggregator *metrics.RelaysMonitorAggregator + relaysHealthCheckEnabled bool + relaysHealthCheckInterval time.Duration + grpcHealthCheckEndpoint string + providerUniqueId string + staticProvider bool + staticSpecPath string + relayLoadLimit uint64 + providerLoadManagersPerChain map[string]*ProviderLoadManager } func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { @@ -168,6 +169,7 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { rpcp.staticProvider = options.staticProvider rpcp.staticSpecPath = options.staticSpecPath rpcp.relayLoadLimit = options.relayLoadLimit + rpcp.providerLoadManagersPerChain = make(map[string]*ProviderLoadManager) // single state tracker lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx) @@ -310,16 +312,10 @@ func (rpcp *RPCProvider) SetupProviderEndpoints(rpcProviderEndpoints []*lavasess wg.Add(parallelJobs) disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs) // validate static spec configuration is used only on a single chain setup. - providerLoadManagersPerChain := make(map[string]*ProviderLoadManager) for _, rpcProviderEndpoint := range rpcProviderEndpoints { - providerLoadManager, keyExists := providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] - if !keyExists { - providerLoadManager = NewProviderLoadManager(rpcp.relayLoadLimit) - providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] = providerLoadManager - } setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) { defer wg.Done() - err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator, providerLoadManager) + err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator) if err != nil { rpcp.providerMetricsManager.SetDisabledChain(rpcProviderEndpoint.ChainID, rpcProviderEndpoint.ApiInterface) disabledEndpoints <- rpcProviderEndpoint @@ -348,7 +344,7 @@ func GetAllAddonsAndExtensionsFromNodeUrlSlice(nodeUrls []common.NodeUrl) *Provi return policy } -func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator, providerLoadManager *ProviderLoadManager) error { +func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) error { err := rpcProviderEndpoint.Validate() if err != nil { return utils.LavaFormatError("[PANIC] panic severity critical error, aborting support for chain api due to invalid node url definition, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()}) @@ -411,6 +407,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.Attribute{Key: "Chain", Value: rpcProviderEndpoint.ChainID}, utils.Attribute{Key: "apiInterface", Value: apiInterface}) } + var providerLoadManager *ProviderLoadManager // in order to utilize shared resources between chains we need go routines with the same chain to wait for one another here chainCommonSetup := func() error { @@ -457,6 +454,14 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint chainTracker = chainTrackerLoaded utils.LavaFormatDebug("reusing chain tracker", utils.Attribute{Key: "chain", Value: rpcProviderEndpoint.ChainID}) } + + // create provider load manager per chain ID + var keyExists bool + providerLoadManager, keyExists = rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] + if !keyExists { + providerLoadManager = NewProviderLoadManager(rpcp.relayLoadLimit) + rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] = providerLoadManager + } return nil } err = chainCommonSetup() diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index d4d4621b9a..bf4ee61fd2 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -159,6 +159,11 @@ func (rpcps *RPCProviderServer) initRelaysMonitor(ctx context.Context) { rpcps.relaysMonitor.Start(ctx) } +func (rpcps *RPCProviderServer) setLoadMetric() { + loadRate := rpcps.providerLoadManager.getActiveRequestsPerSecond() + rpcps.metrics.SetLoadRate(float64(loadRate)) +} + func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.ChainMessage, err error) { parsing, apiCollection, ok := rpcps.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCKNUM) if !ok { @@ -184,8 +189,11 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain // function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { // count the number of simultaneous relay calls - rpcps.providerLoadManager.addAndSetRelayLoadToContextTrailer(ctx) + isLoadRateSet := rpcps.providerLoadManager.addAndSetRelayLoadToContextTrailer(ctx) defer func() { go rpcps.providerLoadManager.subtractRelayCall() }() + if isLoadRateSet { + go rpcps.setLoadMetric() + } trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId) grpc.SetTrailer(ctx, trailerMd) if request.RelayData == nil || request.RelaySession == nil { diff --git a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh b/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh index 2ebffd14ea..76da201899 100755 --- a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh +++ b/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh @@ -53,7 +53,7 @@ screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --rate-limit-requests-per-second 100 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block From 82f30201a06772d7dfe84e6a877150d967a009cd Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 8 Oct 2024 15:22:53 +0200 Subject: [PATCH 35/47] fix division to be float and not uint --- protocol/rpcprovider/provider_load_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index a69364a4a5..7ddf452a08 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -49,7 +49,7 @@ func (loadManager *ProviderLoadManager) getProviderLoad() string { return "" } activeRequests := loadManager.activeRequestsPerSecond.Load() - return strconv.FormatFloat(float64(activeRequests/rateLimitThreshold), 'f', -1, 64) + return strconv.FormatFloat(float64(activeRequests)/float64(rateLimitThreshold), 'f', -1, 64) } func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) bool { From 14d8c6876060b212229c74f15f50b009873a569d Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 8 Oct 2024 15:23:20 +0200 Subject: [PATCH 36/47] roll back init lava only with node two consumers --- scripts/pre_setups/init_lava_only_with_node_two_consumers.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh b/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh index 76da201899..2ebffd14ea 100755 --- a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh +++ b/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh @@ -53,7 +53,7 @@ screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --rate-limit-requests-per-second 100 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block From 2d6128976d41a5e0efd6676edc69c7618a88dfcf Mon Sep 17 00:00:00 2001 From: leon mandel Date: Tue, 8 Oct 2024 17:34:36 +0200 Subject: [PATCH 37/47] fix load metric --- protocol/rpcprovider/provider_load_manager.go | 13 +++++++------ protocol/rpcprovider/rpcprovider_server.go | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 7ddf452a08..52df38b92d 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -40,25 +40,26 @@ func (loadManager *ProviderLoadManager) subtractRelayCall() { loadManager.activeRequestsPerSecond.Add(^uint64(0)) } -func (loadManager *ProviderLoadManager) getProviderLoad() string { +func (loadManager *ProviderLoadManager) getProviderLoad() float64 { if loadManager == nil { - return "" + return 0 } rateLimitThreshold := loadManager.rateLimitThreshold.Load() if rateLimitThreshold == 0 { - return "" + return 0 } activeRequests := loadManager.activeRequestsPerSecond.Load() - return strconv.FormatFloat(float64(activeRequests)/float64(rateLimitThreshold), 'f', -1, 64) + return float64(activeRequests) / float64(rateLimitThreshold) } func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) bool { provideRelayLoad := loadManager.getProviderLoad() - if provideRelayLoad == "" { + if provideRelayLoad == 0 { return false } + formattedProviderLoad := strconv.FormatFloat(provideRelayLoad, 'f', -1, 64) - trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, provideRelayLoad) + trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, formattedProviderLoad) grpc.SetTrailer(ctx, trailerMd) return true } diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index bf4ee61fd2..003dcbca4a 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -160,8 +160,8 @@ func (rpcps *RPCProviderServer) initRelaysMonitor(ctx context.Context) { } func (rpcps *RPCProviderServer) setLoadMetric() { - loadRate := rpcps.providerLoadManager.getActiveRequestsPerSecond() - rpcps.metrics.SetLoadRate(float64(loadRate)) + loadRate := rpcps.providerLoadManager.getProviderLoad() + rpcps.metrics.SetLoadRate(loadRate) } func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.ChainMessage, err error) { From 97f72eca0c6d2b385e70d12d5bdeccdfa59dbda7 Mon Sep 17 00:00:00 2001 From: leon mandel Date: Wed, 9 Oct 2024 10:38:22 +0200 Subject: [PATCH 38/47] merge main --- scripts/pre_setups/init_lava_only_with_node_two_consumers.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh b/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh index 2ebffd14ea..0a2ba349c1 100755 --- a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh +++ b/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh @@ -53,7 +53,7 @@ screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --rate-limit-requests-per-second 10 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block From 454db086f1284b4cd961f59bc1dfd27f58721160 Mon Sep 17 00:00:00 2001 From: Leon Magma Date: Wed, 9 Oct 2024 10:52:36 +0200 Subject: [PATCH 39/47] Update protocol/chainlib/common.go Co-authored-by: Elad Gildnur <6321801+shleikes@users.noreply.github.com> --- protocol/chainlib/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 80291cf608..e580c9f440 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -30,7 +30,7 @@ const ( relayMsgLogMaxChars = 200 RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash" RPCProviderNodeExtension = "Lava-Provider-Node-Extension" - RpcProviderLoadRateHeader = "Lava-Provider-load-rate" + RpcProviderLoadRateHeader = "Lava-Provider-Load-Rate" RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id" WebSocketExtension = "websocket" ) From d4c725876098a7549689dfcd128244ce81da6a7d Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 10 Oct 2024 13:05:07 +0200 Subject: [PATCH 40/47] fix load calculation --- protocol/chainlib/common.go | 1 + protocol/rpcconsumer/rpcconsumer_server.go | 20 ++++++----- protocol/rpcprovider/provider_load_manager.go | 36 ++++++------------- protocol/rpcprovider/rpcprovider_server.go | 20 ++++++----- 4 files changed, 33 insertions(+), 44 deletions(-) diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index e580c9f440..c6f3a53a3e 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -36,6 +36,7 @@ const ( ) var ( + TrailersToAddToHeaderResponse = []string{RPCProviderNodeExtension, RpcProviderLoadRateHeader} InvalidResponses = []string{"null", "", "nil", "undefined"} FailedSendingSubscriptionToClients = sdkerrors.New("failed Sending Subscription To Clients", 1015, "Failed Sending Subscription To Clients connection might have been closed by the user") NoActiveSubscriptionFound = sdkerrors.New("failed finding an active subscription on provider side", 1016, "no active subscriptions for hashed params.") diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index cfa4bbb6a1..ff35e82d32 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -1261,14 +1261,17 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch chainMessage.SetForceCacheRefresh(ok) } -func (rpccs *RPCConsumerServer) getMetadataFromRelayTrailer(metadataHeader string, relayResult *common.RelayResult) { - trailerValue := relayResult.ProviderTrailer.Get(metadataHeader) - if len(trailerValue) > 0 { - extensionMD := pairingtypes.Metadata{ - Name: metadataHeader, - Value: trailerValue[0], +// Iterating over metadataHeaders adding each trailer that fits the header if found to relayResult.Relay.Metadata +func (rpccs *RPCConsumerServer) getMetadataFromRelayTrailer(metadataHeaders []string, relayResult *common.RelayResult) { + for _, metadataHeader := range metadataHeaders { + trailerValue := relayResult.ProviderTrailer.Get(metadataHeader) + if len(trailerValue) > 0 { + extensionMD := pairingtypes.Metadata{ + Name: metadataHeader, + Value: trailerValue[0], + } + relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD) } - relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD) } } @@ -1344,8 +1347,7 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, } // fetch trailer information from the provider by using the provider trailer field. - rpccs.getMetadataFromRelayTrailer(chainlib.RPCProviderNodeExtension, relayResult) - rpccs.getMetadataFromRelayTrailer(chainlib.RpcProviderLoadRateHeader, relayResult) + rpccs.getMetadataFromRelayTrailer(chainlib.TrailersToAddToHeaderResponse, relayResult) directiveHeaders := protocolMessage.GetDirectiveHeaders() _, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY] diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 52df38b92d..034fc17584 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -26,13 +26,6 @@ func NewProviderLoadManager(rateLimitThreshold uint64) *ProviderLoadManager { return loadManager } -func (loadManager *ProviderLoadManager) addRelayCall() { - if loadManager == nil { - return - } - loadManager.activeRequestsPerSecond.Add(1) -} - func (loadManager *ProviderLoadManager) subtractRelayCall() { if loadManager == nil { return @@ -40,35 +33,26 @@ func (loadManager *ProviderLoadManager) subtractRelayCall() { loadManager.activeRequestsPerSecond.Add(^uint64(0)) } -func (loadManager *ProviderLoadManager) getProviderLoad() float64 { - if loadManager == nil { - return 0 - } +func (loadManager *ProviderLoadManager) getProviderLoad(activeRequests uint64) float64 { rateLimitThreshold := loadManager.rateLimitThreshold.Load() if rateLimitThreshold == 0 { return 0 } - activeRequests := loadManager.activeRequestsPerSecond.Load() return float64(activeRequests) / float64(rateLimitThreshold) } -func (loadManager *ProviderLoadManager) applyProviderLoadMetadataToContextTrailer(ctx context.Context) bool { - provideRelayLoad := loadManager.getProviderLoad() +// Add relay count, calculate current load +func (loadManager *ProviderLoadManager) addAndSetRelayLoadToContextTrailer(ctx context.Context) float64 { + if loadManager == nil { + return 0 + } + activeRequestsPerSecond := loadManager.activeRequestsPerSecond.Add(1) + provideRelayLoad := loadManager.getProviderLoad(activeRequestsPerSecond) if provideRelayLoad == 0 { - return false + return provideRelayLoad } formattedProviderLoad := strconv.FormatFloat(provideRelayLoad, 'f', -1, 64) - trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, formattedProviderLoad) grpc.SetTrailer(ctx, trailerMd) - return true -} - -func (loadManager *ProviderLoadManager) addAndSetRelayLoadToContextTrailer(ctx context.Context) bool { - loadManager.addRelayCall() - return loadManager.applyProviderLoadMetadataToContextTrailer(ctx) -} - -func (loadManager *ProviderLoadManager) getActiveRequestsPerSecond() uint64 { - return loadManager.activeRequestsPerSecond.Load() + return provideRelayLoad } diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 003dcbca4a..78197a77b5 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -159,9 +159,8 @@ func (rpcps *RPCProviderServer) initRelaysMonitor(ctx context.Context) { rpcps.relaysMonitor.Start(ctx) } -func (rpcps *RPCProviderServer) setLoadMetric() { - loadRate := rpcps.providerLoadManager.getProviderLoad() - rpcps.metrics.SetLoadRate(loadRate) +func (rpcps *RPCProviderServer) setLoadMetric(currentLoad float64) { + rpcps.metrics.SetLoadRate(currentLoad) } func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.ChainMessage, err error) { @@ -188,12 +187,15 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain // function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { - // count the number of simultaneous relay calls - isLoadRateSet := rpcps.providerLoadManager.addAndSetRelayLoadToContextTrailer(ctx) - defer func() { go rpcps.providerLoadManager.subtractRelayCall() }() - if isLoadRateSet { - go rpcps.setLoadMetric() - } + // get the number of simultaneous relay calls + currentLoad := rpcps.providerLoadManager.addAndSetRelayLoadToContextTrailer(ctx) + defer func() { + // add load metric and subtract the load at the end of the relay using a routine. + go func() { + rpcps.providerLoadManager.subtractRelayCall() + rpcps.setLoadMetric(currentLoad) + }() + }() trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId) grpc.SetTrailer(ctx, trailerMd) if request.RelayData == nil || request.RelaySession == nil { From 273a32ac3ce154858bde9c066de88ae03f6424b9 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 10 Oct 2024 13:08:59 +0200 Subject: [PATCH 41/47] tidy code --- protocol/rpcprovider/rpcprovider.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 9b5419a88d..61d71678af 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -407,8 +407,6 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.Attribute{Key: "Chain", Value: rpcProviderEndpoint.ChainID}, utils.Attribute{Key: "apiInterface", Value: apiInterface}) } - var providerLoadManager *ProviderLoadManager - // in order to utilize shared resources between chains we need go routines with the same chain to wait for one another here chainCommonSetup := func() error { rpcp.chainMutexes[chainID].Lock() @@ -456,11 +454,9 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint } // create provider load manager per chain ID - var keyExists bool - providerLoadManager, keyExists = rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] + _, keyExists := rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] if !keyExists { - providerLoadManager = NewProviderLoadManager(rpcp.relayLoadLimit) - rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] = providerLoadManager + rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] = NewProviderLoadManager(rpcp.relayLoadLimit) } return nil } @@ -497,7 +493,11 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint)) providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey) } - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, providerLoadManager) + loadManager, found := rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] + if !found { + utils.LavaFormatError("Failed creating provider load manager", nil, utils.LogAttr("chainId", rpcProviderEndpoint.ChainID)) + } + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager) // set up grpc listener var listener *ProviderListener func() { From 6fb7276695ec91038824217ea10e66647b0e337e Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 10 Oct 2024 13:10:44 +0200 Subject: [PATCH 42/47] changing rate limit to 1k --- scripts/pre_setups/init_lava_only_with_node_two_consumers.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh b/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh index 0a2ba349c1..514c8acfbd 100755 --- a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh +++ b/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh @@ -53,7 +53,7 @@ screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --rate-limit-requests-per-second 10 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --rate-limit-requests-per-second 1000 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block From 272e676a259a3490ed253f15c09dfbfcbc62f863 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 10 Oct 2024 14:21:15 +0200 Subject: [PATCH 43/47] fix bug --- protocol/metrics/metrics_provider.go | 7 +++++-- protocol/rpcprovider/rpcprovider_server.go | 6 +----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/protocol/metrics/metrics_provider.go b/protocol/metrics/metrics_provider.go index e1c69c3610..e1cf39d477 100644 --- a/protocol/metrics/metrics_provider.go +++ b/protocol/metrics/metrics_provider.go @@ -50,8 +50,11 @@ func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pair } } -func (pm *ProviderMetrics) SetLoadRate(loatRate float64) { - pm.loadRateMetric.WithLabelValues(pm.specID).Set(loatRate) +func (pm *ProviderMetrics) SetLoadRate(loadRate float64) { + if pm == nil { + return + } + pm.loadRateMetric.WithLabelValues(pm.specID).Set(loadRate) } func (pm *ProviderMetrics) AddPayment(cu uint64) { diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 78197a77b5..3f7dc8599c 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -159,10 +159,6 @@ func (rpcps *RPCProviderServer) initRelaysMonitor(ctx context.Context) { rpcps.relaysMonitor.Start(ctx) } -func (rpcps *RPCProviderServer) setLoadMetric(currentLoad float64) { - rpcps.metrics.SetLoadRate(currentLoad) -} - func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.ChainMessage, err error) { parsing, apiCollection, ok := rpcps.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCKNUM) if !ok { @@ -193,7 +189,7 @@ func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes // add load metric and subtract the load at the end of the relay using a routine. go func() { rpcps.providerLoadManager.subtractRelayCall() - rpcps.setLoadMetric(currentLoad) + rpcps.metrics.SetLoadRate(currentLoad) }() }() trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId) From 42486aceb7085f7e101f88339875eb9fd5797bf3 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 10 Oct 2024 15:47:20 +0200 Subject: [PATCH 44/47] fix pr --- protocol/rpcprovider/provider_load_manager.go | 9 +++------ ...ers.sh => init_lava_only_with_node_rate_limit.sh} | 12 +++--------- 2 files changed, 6 insertions(+), 15 deletions(-) rename scripts/pre_setups/{init_lava_only_with_node_two_consumers.sh => init_lava_only_with_node_rate_limit.sh} (79%) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index 034fc17584..e4a3b97d41 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -11,7 +11,7 @@ import ( ) type ProviderLoadManager struct { - rateLimitThreshold atomic.Uint64 + rateLimitThreshold uint64 activeRequestsPerSecond atomic.Uint64 } @@ -19,10 +19,7 @@ func NewProviderLoadManager(rateLimitThreshold uint64) *ProviderLoadManager { if rateLimitThreshold == 0 { return nil } - loadManager := &ProviderLoadManager{} - - loadManager.rateLimitThreshold.Store(rateLimitThreshold) - + loadManager := &ProviderLoadManager{rateLimitThreshold: rateLimitThreshold} return loadManager } @@ -34,7 +31,7 @@ func (loadManager *ProviderLoadManager) subtractRelayCall() { } func (loadManager *ProviderLoadManager) getProviderLoad(activeRequests uint64) float64 { - rateLimitThreshold := loadManager.rateLimitThreshold.Load() + rateLimitThreshold := loadManager.rateLimitThreshold if rateLimitThreshold == 0 { return 0 } diff --git a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh b/scripts/pre_setups/init_lava_only_with_node_rate_limit.sh similarity index 79% rename from scripts/pre_setups/init_lava_only_with_node_two_consumers.sh rename to scripts/pre_setups/init_lava_only_with_node_rate_limit.sh index 514c8acfbd..4d35705eb0 100755 --- a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh +++ b/scripts/pre_setups/init_lava_only_with_node_rate_limit.sh @@ -20,7 +20,7 @@ echo "[Test Setup] sleeping 20 seconds for node to finish setup (if its not enou sleep 5 wait_for_lava_node_to_start -GASPRICE="0.000000001ulava" +GASPRICE="0.00002ulava" lavad tx gov submit-legacy-proposal spec-add ./cookbook/specs/ibc.json,./cookbook/specs/cosmoswasm.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/cosmossdk_45.json,./cookbook/specs/cosmossdk_full.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/cosmoshub.json,./cookbook/specs/lava.json,./cookbook/specs/osmosis.json,./cookbook/specs/fantom.json,./cookbook/specs/celo.json,./cookbook/specs/optimism.json,./cookbook/specs/arbitrum.json,./cookbook/specs/starknet.json,./cookbook/specs/aptos.json,./cookbook/specs/juno.json,./cookbook/specs/polygon.json,./cookbook/specs/evmos.json,./cookbook/specs/base.json,./cookbook/specs/canto.json,./cookbook/specs/sui.json,./cookbook/specs/solana.json,./cookbook/specs/bsc.json,./cookbook/specs/axelar.json,./cookbook/specs/avalanche.json,./cookbook/specs/fvm.json --lava-dev-test -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE & wait_next_block wait_next_block @@ -42,8 +42,6 @@ PROVIDER1_LISTENER="127.0.0.1:2220" lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE wait_next_block -lavad tx subscription buy DefaultPlan $(lavad keys show user2 -a) -y --from user2 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE -wait_next_block lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE sleep_until_next_epoch @@ -53,17 +51,13 @@ screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --rate-limit-requests-per-second 1000 --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --rate-limit-requests-per-second 10 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 - -screen -d -m -S consumers2 bash -c "source ~/.bashrc; lavap rpcconsumer \ -127.0.0.1:3350 LAV1 rest 127.0.0.1:3351 LAV1 tendermintrpc 127.0.0.1:3352 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user2 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7773" 2>&1 | tee $LOGS_DIR/CONSUMERS2.log" && sleep 0.25 +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 echo "--- setting up screens done ---" screen -ls \ No newline at end of file From 974dbcb55b4332fb7e5770721d23b6db9c3fd1e6 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 27 Oct 2024 16:35:23 +0100 Subject: [PATCH 45/47] v4 --- protocol/rpcprovider/provider_load_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go index e4a3b97d41..f21b221ad9 100644 --- a/protocol/rpcprovider/provider_load_manager.go +++ b/protocol/rpcprovider/provider_load_manager.go @@ -5,7 +5,7 @@ import ( "strconv" "sync/atomic" - "github.com/lavanet/lava/v3/protocol/chainlib" + "github.com/lavanet/lava/v4/protocol/chainlib" grpc "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) From feeb373ce6f59ea62180fedc957ea56ab3545193 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 27 Oct 2024 16:48:10 +0100 Subject: [PATCH 46/47] fix pr --- protocol/rpcprovider/rpcprovider.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index d6f8c32da6..d8f6fcff2a 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -143,7 +143,7 @@ type RPCProvider struct { staticProvider bool staticSpecPath string relayLoadLimit uint64 - providerLoadManagersPerChain map[string]*ProviderLoadManager + providerLoadManagersPerChain *common.SafeSyncMap[string, *ProviderLoadManager] } func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { @@ -169,8 +169,7 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { rpcp.staticProvider = options.staticProvider rpcp.staticSpecPath = options.staticSpecPath rpcp.relayLoadLimit = options.relayLoadLimit - rpcp.providerLoadManagersPerChain = make(map[string]*ProviderLoadManager) - + rpcp.providerLoadManagersPerChain = &common.SafeSyncMap[string, *ProviderLoadManager]{} // single state tracker lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx) providerStateTracker, err := statetracker.NewProviderStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, rpcp.providerMetricsManager) @@ -454,10 +453,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint } // create provider load manager per chain ID - _, keyExists := rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] - if !keyExists { - rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] = NewProviderLoadManager(rpcp.relayLoadLimit) - } + rpcp.providerLoadManagersPerChain.LoadOrStore(rpcProviderEndpoint.ChainID, NewProviderLoadManager(rpcp.relayLoadLimit)) return nil } err = chainCommonSetup() @@ -493,8 +489,9 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint)) providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey) } - loadManager, found := rpcp.providerLoadManagersPerChain[rpcProviderEndpoint.ChainID] - if !found { + + loadManager, found, err := rpcp.providerLoadManagersPerChain.Load(rpcProviderEndpoint.ChainID) + if !found || err != nil { utils.LavaFormatError("Failed creating provider load manager", nil, utils.LogAttr("chainId", rpcProviderEndpoint.ChainID)) } rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager) From 51894ce42d526f4ad16cf73a29bcb1a5cbaddaa2 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 27 Oct 2024 16:56:33 +0100 Subject: [PATCH 47/47] fix --- protocol/rpcprovider/rpcprovider.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index d8f6fcff2a..22b9ed8b65 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -407,6 +407,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.Attribute{Key: "apiInterface", Value: apiInterface}) } // in order to utilize shared resources between chains we need go routines with the same chain to wait for one another here + var loadManager *ProviderLoadManager chainCommonSetup := func() error { rpcp.chainMutexes[chainID].Lock() defer rpcp.chainMutexes[chainID].Unlock() @@ -453,7 +454,10 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint } // create provider load manager per chain ID - rpcp.providerLoadManagersPerChain.LoadOrStore(rpcProviderEndpoint.ChainID, NewProviderLoadManager(rpcp.relayLoadLimit)) + loadManager, _, err = rpcp.providerLoadManagersPerChain.LoadOrStore(rpcProviderEndpoint.ChainID, NewProviderLoadManager(rpcp.relayLoadLimit)) + if err != nil { + utils.LavaFormatError("Failed LoadOrStore providerLoadManagersPerChain", err, utils.LogAttr("chainId", rpcProviderEndpoint.ChainID), utils.LogAttr("rpcp.relayLoadLimit", rpcp.relayLoadLimit)) + } return nil } err = chainCommonSetup() @@ -489,11 +493,6 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint)) providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey) } - - loadManager, found, err := rpcp.providerLoadManagersPerChain.Load(rpcProviderEndpoint.ChainID) - if !found || err != nil { - utils.LavaFormatError("Failed creating provider load manager", nil, utils.LogAttr("chainId", rpcProviderEndpoint.ChainID)) - } rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager) // set up grpc listener var listener *ProviderListener