From 90db93df645db65b5da122d9df2c05eba2a58b37 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 30 May 2024 19:32:30 +0200 Subject: [PATCH 01/33] PRT-adding-errored-and-blocked-providers-to-headers --- protocol/common/endpoints.go | 2 ++ .../lavasession/consumer_session_manager.go | 7 ++++++- .../consumer_session_manager_test.go | 2 +- protocol/rpcconsumer/rpcconsumer.go | 2 +- protocol/rpcconsumer/rpcconsumer_server.go | 21 +++++++++++-------- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index ab133cebf9..5c8297a51e 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -29,6 +29,8 @@ const ( RELAY_TIMEOUT_HEADER_NAME = "lava-relay-timeout" EXTENSION_OVERRIDE_HEADER_NAME = "lava-extension" FORCE_CACHE_REFRESH_HEADER_NAME = "lava-force-cache-refresh" + GET_BLOCKED_PROVIDERS = "lava-get-blocked-providers" + GET_ERRORED_PROVIDERS = "lava-get-errored-providers" // send http request to /lava/health to see if the process is up - (ret code 200) DEFAULT_HEALTH_PATH = "/lava/health" MAXIMUM_ALLOWED_TIMEOUT_EXTEND_MULTIPLIER_BY_THE_CONSUMER = 4 diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 2fe8924a3a..8fb12a86c0 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -54,6 +54,7 @@ type ConsumerSessionManager struct { pairingPurge map[string]*ConsumerSessionsWithProvider providerOptimizer ProviderOptimizer consumerMetricsManager *metrics.ConsumerMetricsManager + consumerPublicAddress string } // this is being read in multiple locations and but never changes so no need to lock. @@ -321,6 +322,9 @@ func (csm *ConsumerSessionManager) resetValidAddresses(addon string, extensions utils.LavaFormatWarning("Provider pairing list is empty, resetting state.", nil, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "extensions", Value: extensions}) } else { utils.LavaFormatWarning("No providers for asked addon or extension, list is empty after trying to reset", nil, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "extensions", Value: extensions}) + if addon == "" && len(extensions) == 0 { + utils.LavaFormatError("User subscription might have expired or not purchased properly, pairing list is empty after reset", nil, utils.LogAttr("consumer_address", csm.consumerPublicAddress)) + } } csm.numberOfResets += 1 } @@ -1050,10 +1054,11 @@ func (csm *ConsumerSessionManager) GenerateReconnectCallback(consumerSessionsWit } } -func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter) *ConsumerSessionManager { +func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter, consumerPublicAddress string) *ConsumerSessionManager { csm := &ConsumerSessionManager{ reportedProviders: NewReportedProviders(reporter), consumerMetricsManager: consumerMetricsManager, + consumerPublicAddress: consumerPublicAddress, } csm.rpcEndpoint = rpcEndpoint csm.providerOptimizer = providerOptimizer diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index 7eb124bdb8..8ef109fe8a 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -158,7 +158,7 @@ var grpcListener = "localhost:0" func CreateConsumerSessionManager() *ConsumerSessionManager { rand.InitRandomSeed() baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better - return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil) + return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil, "lava@test") } func TestMain(m *testing.M) { diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 976fe6b13b..bfc1700074 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -282,7 +282,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt } // Register For Updates - consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager) + consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager, consumerAddr.String()) rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager) var relaysMonitor *metrics.RelaysMonitor diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 74d931a85a..1a8b5755fb 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -961,12 +961,11 @@ func (rpccs *RPCConsumerServer) LavaDirectiveHeaders(metadata []pairingtypes.Met name := strings.ToLower(metaElement.Name) switch name { case common.BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME: - headerDirectives[name] = metaElement.Value case common.RELAY_TIMEOUT_HEADER_NAME: - headerDirectives[name] = metaElement.Value case common.EXTENSION_OVERRIDE_HEADER_NAME: - headerDirectives[name] = metaElement.Value case common.FORCE_CACHE_REFRESH_HEADER_NAME: + case common.GET_BLOCKED_PROVIDERS: + case common.GET_ERRORED_PROVIDERS: headerDirectives[name] = metaElement.Value default: metadataRet = append(metadataRet, metaElement) @@ -1011,13 +1010,17 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, } metadataReply := []pairingtypes.Metadata{} // add the provider that responded - if relayResult.GetProvider() != "" { - metadataReply = append(metadataReply, - pairingtypes.Metadata{ - Name: common.PROVIDER_ADDRESS_HEADER_NAME, - Value: relayResult.GetProvider(), - }) + + providerAddress := relayResult.GetProvider() + if providerAddress == "" { + providerAddress = "Cached" } + metadataReply = append(metadataReply, + pairingtypes.Metadata{ + Name: common.PROVIDER_ADDRESS_HEADER_NAME, + Value: providerAddress, + }) + // add the relay retried count if protocolErrors > 0 { metadataReply = append(metadataReply, From 691ff481f75f136987bac5b365222f35d8da7a2e Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 30 May 2024 23:26:14 +0200 Subject: [PATCH 02/33] changing log level from info to debug on spammy logs --- protocol/chainlib/jsonRPC.go | 2 +- protocol/chainlib/tendermintRPC.go | 2 +- protocol/lavasession/consumer_session_manager.go | 2 +- protocol/lavasession/single_consumer_session.go | 2 +- protocol/metrics/metricsService.go | 6 +++--- protocol/metrics/rpcconsumerlogs.go | 2 +- protocol/rpcconsumer/relay_errors.go | 2 +- protocol/statetracker/updaters/version_updater.go | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index 9eccc7536b..66e331d46d 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -112,7 +112,7 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType // Check api is supported and save it in nodeMsg apiCont, err := apip.getSupportedApi(msg.Method, connectionType, internalPath) if err != nil { - utils.LavaFormatInfo("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err)) + utils.LavaFormatDebug("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err)) return nil, err } diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 0fb92fff57..617e011d5a 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -139,7 +139,7 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect // Check api is supported and save it in nodeMsg apiCont, err := apip.getSupportedApi(msg.Method, connectionType) if err != nil { - utils.LavaFormatInfo("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err)) + utils.LavaFormatDebug("getSupportedApi jsonrpc failed", utils.LogAttr("method", msg.Method), utils.LogAttr("error", err)) return nil, err } diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 8fb12a86c0..a7802cab79 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -713,7 +713,7 @@ func (csm *ConsumerSessionManager) getValidConsumerSessionsWithProvider(ignoredP // Fetch provider addresses providerAddresses, err := csm.getValidProviderAddresses(ignoredProviders.providers, cuNeededForSession, requestedBlock, addon, extensions, stateful) if err != nil { - utils.LavaFormatInfo(csm.rpcEndpoint.ChainID+" could not get a provider addresses", utils.LogAttr("error", err)) + utils.LavaFormatDebug(csm.rpcEndpoint.ChainID+" could not get a provider addresses", utils.LogAttr("error", err)) return nil, err } diff --git a/protocol/lavasession/single_consumer_session.go b/protocol/lavasession/single_consumer_session.go index 561720c40d..4e6d2c9365 100644 --- a/protocol/lavasession/single_consumer_session.go +++ b/protocol/lavasession/single_consumer_session.go @@ -57,7 +57,7 @@ func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Dura downtimePercentage, scaledAvailabilityScore := CalculateAvailabilityScore(&cs.QoSInfo) cs.QoSInfo.LastQoSReport.Availability = scaledAvailabilityScore if sdk.OneDec().GT(cs.QoSInfo.LastQoSReport.Availability) { - utils.LavaFormatInfo("QoS Availability report", utils.Attribute{Key: "Availability", Value: cs.QoSInfo.LastQoSReport.Availability}, utils.Attribute{Key: "down percent", Value: downtimePercentage}) + utils.LavaFormatDebug("QoS Availability report", utils.Attribute{Key: "Availability", Value: cs.QoSInfo.LastQoSReport.Availability}, utils.Attribute{Key: "down percent", Value: downtimePercentage}) } latencyScore := sdk.MinDec(sdk.OneDec(), sdk.NewDecFromInt(sdk.NewInt(int64(expectedLatency))).Quo(sdk.NewDecFromInt(sdk.NewInt(int64(latency))))) diff --git a/protocol/metrics/metricsService.go b/protocol/metrics/metricsService.go index 8ba7cfb8e7..7ce99601c2 100644 --- a/protocol/metrics/metricsService.go +++ b/protocol/metrics/metricsService.go @@ -56,11 +56,11 @@ func NewMetricService() *MetricService { select { case <-ticker.C: { - utils.LavaFormatInfo("metric triggered, sending accumulated data to server") + utils.LavaFormatDebug("metric triggered, sending accumulated data to server") result.SendEachProjectMetricData() } case metricData := <-mChannel: - utils.LavaFormatInfo("reading from chanel data") + utils.LavaFormatDebug("reading from chanel data") result.storeAggregatedData(metricData) } } @@ -73,7 +73,7 @@ func (m *MetricService) SendData(data RelayMetrics) { select { case m.MetricsChannel <- data: default: - utils.LavaFormatInfo("channel is full, ignoring these data", + utils.LavaFormatDebug("channel is full, ignoring these data", utils.Attribute{Key: "projectHash", Value: data.ProjectHash}, utils.Attribute{Key: "chainId", Value: data.ChainID}, utils.Attribute{Key: "apiType", Value: data.APIType}, diff --git a/protocol/metrics/rpcconsumerlogs.go b/protocol/metrics/rpcconsumerlogs.go index 14083a87f0..2fb8202a5a 100644 --- a/protocol/metrics/rpcconsumerlogs.go +++ b/protocol/metrics/rpcconsumerlogs.go @@ -118,7 +118,7 @@ func (rpccl *RPCConsumerLogs) AnalyzeWebSocketErrorAndWriteMessage(c *websocket. if err != nil { errMessage := err.Error() if strings.Contains(errMessage, webSocketCloseMessage) { - utils.LavaFormatInfo("Websocket connection closed by the user, " + errMessage) + utils.LavaFormatDebug("Websocket connection closed by the user, " + errMessage) return } rpccl.LogRequestAndResponse(rpcType+" ws msg", true, "ws", c.LocalAddr().String(), string(msg), "", msgSeed, timeTaken, err) diff --git a/protocol/rpcconsumer/relay_errors.go b/protocol/rpcconsumer/relay_errors.go index 0adae35b9c..14a15a7b4e 100644 --- a/protocol/rpcconsumer/relay_errors.go +++ b/protocol/rpcconsumer/relay_errors.go @@ -77,7 +77,7 @@ func (r *RelayErrors) GetBestErrorMessageForUser() RelayError { if bestIndex != -1 { // Return the chosen error. // Print info for the consumer to know which errors happened - utils.LavaFormatInfo("Failed all relays", utils.LogAttr("error_map", errorMap)) + utils.LavaFormatDebug("Failed all relays", utils.LogAttr("error_map", errorMap)) return r.relayErrors[bestIndex] } // if we didn't manage to find any index return all. diff --git a/protocol/statetracker/updaters/version_updater.go b/protocol/statetracker/updaters/version_updater.go index 86463e4902..f38c5bc053 100644 --- a/protocol/statetracker/updaters/version_updater.go +++ b/protocol/statetracker/updaters/version_updater.go @@ -57,7 +57,7 @@ func (vu *VersionUpdater) updateInner(latestBlock int64) { utils.LavaFormatError("could not get version when updated, did not update protocol version and needed to", err) return } - utils.LavaFormatInfo("Protocol version has been fetched successfully!", + utils.LavaFormatDebug("Protocol version has been fetched successfully!", utils.Attribute{Key: "old_version", Value: vu.LastKnownVersion}, utils.Attribute{Key: "new_version", Value: version}) // if no error, set the last known version. From 263df9a2caedc849d7245f4dd50791fb6e40b24b Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 15:08:33 +0200 Subject: [PATCH 03/33] fixing protocol test --- protocol/integration/protocol_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 02dcf6b2c0..6db264e764 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -156,7 +156,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, specId string, apiInte _, averageBlockTime, _, _ := chainParser.ChainBlockStats() baseLatency := common.AverageWorldLatency / 2 optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2) - consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil) + consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test") consumerSessionManager.UpdateAllProviders(epoch, pairingList) consumerConsistency := rpcconsumer.NewConsumerConsistency(specId) From e3002df5f24dd9d75e6cd0be4b9878cd3229fbf9 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 15:08:57 +0200 Subject: [PATCH 04/33] removing remove extensions flow from the csm. managing it on rpcconsumer --- .../lavasession/consumer_session_manager.go | 37 ++++--------------- 1 file changed, 8 insertions(+), 29 deletions(-) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index a7802cab79..868be931a1 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -323,7 +323,7 @@ func (csm *ConsumerSessionManager) resetValidAddresses(addon string, extensions } else { utils.LavaFormatWarning("No providers for asked addon or extension, list is empty after trying to reset", nil, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "extensions", Value: extensions}) if addon == "" && len(extensions) == 0 { - utils.LavaFormatError("User subscription might have expired or not purchased properly, pairing list is empty after reset", nil, utils.LogAttr("consumer_address", csm.consumerPublicAddress)) + utils.LavaFormatError("User subscription might have expired or not purchased properly, pairing list is empty after reset.", nil, utils.LogAttr("consumer_address", csm.consumerPublicAddress)) } } csm.numberOfResets += 1 @@ -354,6 +354,12 @@ func (csm *ConsumerSessionManager) validatePairingListNotEmpty(addon string, ext return numberOfResets } +func (csm *ConsumerSessionManager) getValidAddressesLengthForExtensionOrAddon(addon string, extensions []string) int { + csm.lock.RLock() + defer csm.lock.RUnlock() + return len(csm.getValidAddresses(addon, extensions)) +} + func (csm *ConsumerSessionManager) getSessionWithProviderOrError(usedProviders UsedProvidersInf, tempIgnoredProviders *ignoredProviders, cuNeededForSession uint64, requestedBlock int64, addon string, extensionNames []string, stateful uint32, virtualEpoch uint64) (sessionWithProviderMap SessionWithProviderMap, err error) { sessionWithProviderMap, err = csm.getValidConsumerSessionsWithProvider(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch) if err != nil { @@ -362,24 +368,7 @@ func (csm *ConsumerSessionManager) getSessionWithProviderOrError(usedProviders U var errOnRetry error sessionWithProviderMap, errOnRetry = csm.tryGetConsumerSessionWithProviderFromBlockedProviderList(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch, usedProviders) if errOnRetry != nil { - // we validate currently used providers are 0 meaning we didn't find a valid extension provider - // so we don't return an invalid error while waiting for a reply for a valid provider. - // in the case we do not have any relays sent at the moment we get a provider from the regular list - if usedProviders.CurrentlyUsed() == 0 && PairingListEmptyError.Is(errOnRetry) && (len(extensionNames) > 0) { - var errGetRegularProvider error - emptyExtensionNames := []string{} - sessionWithProviderMap, errGetRegularProvider = csm.getValidConsumerSessionsWithProvider(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, emptyExtensionNames, stateful, virtualEpoch) - if errGetRegularProvider != nil { - return nil, err // return original error (getValidConsumerSessionsWithProvider) - } - for key := range sessionWithProviderMap { - sessionWithProviderMap[key].RemoveExtensions = true - } - // print a warning in case we got a provider who does not support that addon or extension. - utils.LavaFormatWarning("No Providers For Addon Or Extension, using regular provider for relay", errOnRetry, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensionNames), utils.LogAttr("providers_chosen", sessionWithProviderMap)) - } else { - return nil, err // return original error (getValidConsumerSessionsWithProvider) - } + return nil, err // return original error (getValidConsumerSessionsWithProvider) } } else { return nil, err @@ -426,15 +415,6 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS sessions := make(ConsumerSessionsMap, wantedSession) for { for providerAddress, sessionWithProvider := range sessionWithProviderMap { - // adding a protection when using RemoveAddonsAndExtensions to use only one session. - // we can get here if we wanted 3 archive and got 2 only because one couldn't connect, - // so we tried getting more sessions and got a regular provider due to no pairings available. - // in that case just return the current sessions that we do have. - if sessionWithProvider.RemoveExtensions && len(sessions) >= 1 { - utils.LavaFormatDebug("Too many sessions when using RemoveAddonAndExtensions session", utils.LogAttr("sessions", sessions), utils.LogAttr("wanted_to_add", sessionWithProvider)) - // in that case we just return the sessions we already have. - return sessions, nil - } // Extract values from session with provider consumerSessionsWithProvider := sessionWithProvider.SessionsWithProvider sessionEpoch := sessionWithProvider.CurrentEpoch @@ -531,7 +511,6 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS Session: consumerSession, Epoch: sessionEpoch, ReportedProviders: reportedProviders, - RemoveExtensions: sessionWithProvider.RemoveExtensions, } // adding qos summery for error parsing. From d5c538e021b250033870ee3ea8ca0d7216e9f94e Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 15:09:11 +0200 Subject: [PATCH 05/33] removing deprecated types --- protocol/lavasession/consumer_types.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 911fbb6b3b..55e8f98af1 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -63,7 +63,6 @@ type SessionInfo struct { QoSSummeryResult sdk.Dec // using ComputeQoS to get the total QOS Epoch uint64 ReportedProviders []*pairingtypes.ReportedProvider - RemoveExtensions bool // used when we can't find a provider for an addon or extension and we use a regular provider instead } type ConsumerSessionsMap map[string]*SessionInfo @@ -113,7 +112,6 @@ type Endpoint struct { type SessionWithProvider struct { SessionsWithProvider *ConsumerSessionsWithProvider CurrentEpoch uint64 - RemoveExtensions bool // used when we can't find a provider for an addon or extension and we use a regular provider instead } type SessionWithProviderMap map[string]*SessionWithProvider From 01a2fb05a6e509b5a16b7ea272bc328f35da5172 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 15:09:38 +0200 Subject: [PATCH 06/33] adding new flag to determine is session degradation is allowed --- protocol/rpcconsumer/relay_processor.go | 40 ++++++++++++++++--------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 6a9b41d238..1fc0721b28 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -7,6 +7,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" sdktypes "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/protocol/chainlib" @@ -52,20 +53,31 @@ func NewRelayProcessor(ctx context.Context, usedProviders *lavasession.UsedProvi } type RelayProcessor struct { - usedProviders *lavasession.UsedProviders - responses chan *relayResponse - requiredSuccesses int - nodeResponseErrors RelayErrors - protocolResponseErrors RelayErrors - successResults []common.RelayResult - lock sync.RWMutex - chainMessage chainlib.ChainMessage - guid uint64 - selection Selection - consumerConsistency *ConsumerConsistency - dappID string - consumerIp string - skipDataReliability bool + usedProviders *lavasession.UsedProviders + responses chan *relayResponse + requiredSuccesses int + nodeResponseErrors RelayErrors + protocolResponseErrors RelayErrors + successResults []common.RelayResult + lock sync.RWMutex + chainMessage chainlib.ChainMessage + guid uint64 + selection Selection + consumerConsistency *ConsumerConsistency + dappID string + consumerIp string + skipDataReliability bool + allowSessionDegradation uint32 // used in the scenario where extension was previously used. +} + +// true if we never got an extension. (default value) +func (rp *RelayProcessor) GetAllowSessionDegradation() bool { + return atomic.LoadUint32(&rp.allowSessionDegradation) == 0 +} + +// in case we had an extension and managed to get a session successfully, we prevent session degradation. +func (rp *RelayProcessor) SetAllowSessionDegradation() { + atomic.StoreUint32(&rp.allowSessionDegradation, 1) } func (rp *RelayProcessor) setSkipDataReliability(val bool) { From f45ad67225d2c5d1bf58d2101cf772f20fa9678e Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 15:10:04 +0200 Subject: [PATCH 07/33] extension disabling flow is now on rpc consumer server which makes more sense. --- protocol/rpcconsumer/rpcconsumer_server.go | 31 ++++++++++++++-------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 1a8b5755fb..befe4e62e8 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -551,22 +551,31 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( usedProviders := relayProcessor.GetUsedProviders() sessions, err := rpccs.consumerSessionManager.GetSessions(ctx, chainlib.GetComputeUnits(chainMessage), usedProviders, reqBlock, addon, extensions, chainlib.GetStateful(chainMessage), virtualEpoch) if err != nil { - if lavasession.PairingListEmptyError.Is(err) && (addon != "" || len(extensions) > 0) { - err = utils.LavaFormatError("No Providers For Addon", err, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensions), utils.LogAttr("userIp", consumerIp)) + if lavasession.PairingListEmptyError.Is(err) { + if addon != "" { + return utils.LavaFormatError("No Providers For Addon", err, utils.LogAttr("addon", addon), utils.LogAttr("extensions", extensions), utils.LogAttr("userIp", consumerIp)) + } else if len(extensions) > 0 && relayProcessor.GetAllowSessionDegradation() { // if we have no providers for that extension, use a regular provider, otherwise return the extension results + sessions, err = rpccs.consumerSessionManager.GetSessions(ctx, chainlib.GetComputeUnits(chainMessage), usedProviders, reqBlock, addon, []*spectypes.Extension{}, chainlib.GetStateful(chainMessage), virtualEpoch) + if err != nil { + return err + } + relayProcessor.setSkipDataReliability(true) // disabling data reliability when disabling extensions. + relayRequestData.Extensions = []string{} + } else { + return err + } + } else { + return err } - return err + } + + // making sure next get sessions wont use regular providers + if len(extensions) > 0 { + relayProcessor.SetAllowSessionDegradation() } // Iterate over the sessions map for providerPublicAddress, sessionInfo := range sessions { - // in case we need to remove extensions from relay request data so the providers will get a normal relay. - if sessionInfo.RemoveExtensions { - if len(sessions) > 1 { - utils.LavaFormatError("Should not have more than one session when using RemoveExtensions", nil, utils.LogAttr("sessions", sessions)) - } - relayProcessor.setSkipDataReliability(true) // disabling data reliability when disabling extensions. - relayRequestData.Extensions = []string{} - } // Launch a separate goroutine for each session go func(providerPublicAddress string, sessionInfo *lavasession.SessionInfo) { // add ticker launch metrics From 64a0130c41cb581b3a9db96cbff3312ae704de87 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 15:32:26 +0200 Subject: [PATCH 08/33] adding more info when failing to fetch pairing. --- protocol/statetracker/updaters/state_query.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/protocol/statetracker/updaters/state_query.go b/protocol/statetracker/updaters/state_query.go index 266178228a..284d925f6f 100644 --- a/protocol/statetracker/updaters/state_query.go +++ b/protocol/statetracker/updaters/state_query.go @@ -156,6 +156,13 @@ func (csq *ConsumerStateQuery) GetPairing(ctx context.Context, chainID string, l } csq.lastChainID = chainID csq.ResponsesCache.SetWithTTL(PairingRespKey+chainID, pairingResp, 1, DefaultTimeToLiveExpiration) + if len(pairingResp.Providers) == 0 { + utils.LavaFormatError("Chain returned empty provider list, check node connection and consumer subscription status", nil, + utils.LogAttr("chainId", chainID), + utils.LogAttr("epoch", pairingResp.CurrentEpoch), + utils.LogAttr("consumer_address", csq.clientCtx.FromAddress.String()), + ) + } return pairingResp.Providers, pairingResp.CurrentEpoch, pairingResp.BlockOfNextPairing, nil } From a99c3c2e41204e43cb31ce2927f43e749cbd8208 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 16:43:56 +0200 Subject: [PATCH 09/33] fix test --- .../lavasession/consumer_session_manager_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index 8ef109fe8a..f097557e91 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -88,18 +88,6 @@ func TestHappyFlow(t *testing.T) { } } -func TestExtensionDoesNotExistOnPairingList(t *testing.T) { - ctx := context.Background() - csm := CreateConsumerSessionManager() - pairingList := createPairingList("", true) - err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. - require.NoError(t, err) - ext := []*spectypes.Extension{{Name: "test_non_existing_ex", Rule: &spectypes.Rule{Block: 555}, CuMultiplier: 5}} - _, err = csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", ext, common.NO_STATE, 0) // get a session - // if we got a session successfully we should get no error. - require.NoError(t, err) -} - func getDelayedAddress() string { delayedServerAddress := "127.0.0.1:3335" // because grpcListener is random we might have overlap. in that case just change the port. From c007eba46c81626c0cd56a541429c5b5983ec5e7 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 17:31:00 +0200 Subject: [PATCH 10/33] fix test --- protocol/integration/protocol_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index 6db264e764..358a9025a2 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -440,7 +440,7 @@ func TestConsumerProviderWithProviders(t *testing.T) { seenError := false statuses := map[int]struct{}{} for i := 0; i <= 100; i++ { - client := http.Client{Timeout: 500 * time.Millisecond} + client := http.Client{} req, err := http.NewRequest("GET", "http://"+consumerListenAddress+"/status", nil) require.NoError(t, err) From df3fe7c41da116b9791ad222e1ee2019ec3ced53 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:13:57 +0200 Subject: [PATCH 11/33] adding comment --- protocol/statetracker/emergency_tracker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protocol/statetracker/emergency_tracker.go b/protocol/statetracker/emergency_tracker.go index ebb1557673..0f0b916c0e 100644 --- a/protocol/statetracker/emergency_tracker.go +++ b/protocol/statetracker/emergency_tracker.go @@ -56,6 +56,7 @@ func (cs *EmergencyTracker) UpdateEpoch(epoch uint64) { cs.lock.Lock() defer cs.lock.Unlock() + // checking if we already parsed that epoch. if epoch <= cs.latestEpoch { return } From 7960535b8e3f87df53f61381e170db39824767a0 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:14:14 +0200 Subject: [PATCH 12/33] register for pairing updates now requires chain id --- protocol/statetracker/consumer_state_tracker.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/protocol/statetracker/consumer_state_tracker.go b/protocol/statetracker/consumer_state_tracker.go index d51a30fb6c..9b6ed6cf12 100644 --- a/protocol/statetracker/consumer_state_tracker.go +++ b/protocol/statetracker/consumer_state_tracker.go @@ -49,14 +49,13 @@ func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCt disableConflictTransactions: disableConflictTransactions, } - cst.RegisterForPairingUpdates(ctx, emergencyTracker) err = cst.RegisterForDowntimeParamsUpdates(ctx, emergencyTracker) return cst, err } func (cst *ConsumerStateTracker) RegisterConsumerSessionManagerForPairingUpdates(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager) { // register this CSM to get the updated pairing list when a new epoch starts - pairingUpdater := updaters.NewPairingUpdater(cst.stateQuery) + pairingUpdater := updaters.NewPairingUpdater(cst.stateQuery, consumerSessionManager.RPCEndpoint().ChainID) pairingUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, pairingUpdater) pairingUpdater, ok := pairingUpdaterRaw.(*updaters.PairingUpdater) if !ok { @@ -68,8 +67,8 @@ func (cst *ConsumerStateTracker) RegisterConsumerSessionManagerForPairingUpdates } } -func (cst *ConsumerStateTracker) RegisterForPairingUpdates(ctx context.Context, pairingUpdatable updaters.PairingUpdatable) { - pairingUpdater := updaters.NewPairingUpdater(cst.stateQuery) +func (cst *ConsumerStateTracker) RegisterForPairingUpdates(ctx context.Context, pairingUpdatable updaters.PairingUpdatable, specId string) { + pairingUpdater := updaters.NewPairingUpdater(cst.stateQuery, specId) pairingUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, pairingUpdater) pairingUpdater, ok := pairingUpdaterRaw.(*updaters.PairingUpdater) if !ok { @@ -82,7 +81,7 @@ func (cst *ConsumerStateTracker) RegisterForPairingUpdates(ctx context.Context, } func (cst *ConsumerStateTracker) RegisterFinalizationConsensusForUpdates(ctx context.Context, finalizationConsensus *lavaprotocol.FinalizationConsensus) { - finalizationConsensusUpdater := updaters.NewFinalizationConsensusUpdater(cst.stateQuery) + finalizationConsensusUpdater := updaters.NewFinalizationConsensusUpdater(cst.stateQuery, finalizationConsensus.SpecId) finalizationConsensusUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, finalizationConsensusUpdater) finalizationConsensusUpdater, ok := finalizationConsensusUpdaterRaw.(*updaters.FinalizationConsensusUpdater) if !ok { From 3a490bad12049ea5fdc0d9cc460d84f87e444482 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:14:31 +0200 Subject: [PATCH 13/33] exposing spec id --- protocol/lavaprotocol/finalization_consensus.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/protocol/lavaprotocol/finalization_consensus.go b/protocol/lavaprotocol/finalization_consensus.go index 88107fecff..ca255e48a9 100644 --- a/protocol/lavaprotocol/finalization_consensus.go +++ b/protocol/lavaprotocol/finalization_consensus.go @@ -20,7 +20,7 @@ type FinalizationConsensus struct { providerDataContainersMu sync.RWMutex currentEpoch uint64 latestBlockByMedian uint64 // for caching - specId string + SpecId string } type ProviderHashesConsensus struct { @@ -41,7 +41,7 @@ type providerDataContainer struct { } func NewFinalizationConsensus(specId string) *FinalizationConsensus { - return &FinalizationConsensus{specId: specId} + return &FinalizationConsensus{SpecId: specId} } func GetLatestFinalizedBlock(latestBlock, blockDistanceForFinalizedData int64) int64 { @@ -150,7 +150,7 @@ func (fc *FinalizationConsensus) UpdateFinalizedHashes(blockDistanceForFinalized } } if debug { - utils.LavaFormatDebug("finalization information update successfully", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "finalization data", Value: finalizedBlocks}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}) + utils.LavaFormatDebug("finalization information update successfully", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "finalization data", Value: finalizedBlocks}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}) } return finalizationConflict, nil } @@ -185,7 +185,7 @@ func (fc *FinalizationConsensus) NewEpoch(epoch uint64) { if fc.currentEpoch < epoch { if debug { - utils.LavaFormatDebug("finalization information epoch changed", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "epoch", Value: epoch}) + utils.LavaFormatDebug("finalization information epoch changed", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "epoch", Value: epoch}) } // means it's time to refresh the epoch fc.prevEpochProviderHashesConsensus = fc.currentProviderHashesConsensus @@ -256,11 +256,11 @@ func (fc *FinalizationConsensus) ExpectedBlockHeight(chainParser chainlib.ChainP medianOfExpectedBlocks := median(mapExpectedBlockHeights) providersMedianOfLatestBlock := medianOfExpectedBlocks + int64(blockDistanceForFinalizedData) if debug { - utils.LavaFormatDebug("finalization information", utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "mapExpectedBlockHeights", Value: mapExpectedBlockHeights}, utils.Attribute{Key: "medianOfExpectedBlocks", Value: medianOfExpectedBlocks}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock}) + utils.LavaFormatDebug("finalization information", utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "mapExpectedBlockHeights", Value: mapExpectedBlockHeights}, utils.Attribute{Key: "medianOfExpectedBlocks", Value: medianOfExpectedBlocks}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock}) } if medianOfExpectedBlocks > 0 && uint64(providersMedianOfLatestBlock) > fc.latestBlockByMedian { if uint64(providersMedianOfLatestBlock) > fc.latestBlockByMedian+1000 && fc.latestBlockByMedian > 0 { - utils.LavaFormatError("uncontinuous jump in finalization data", nil, utils.Attribute{Key: "specId", Value: fc.specId}, utils.Attribute{Key: "s.prevEpochProviderHashesConsensus", Value: fc.prevEpochProviderHashesConsensus}, utils.Attribute{Key: "s.currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock}) + utils.LavaFormatError("uncontinuous jump in finalization data", nil, utils.Attribute{Key: "specId", Value: fc.SpecId}, utils.Attribute{Key: "s.prevEpochProviderHashesConsensus", Value: fc.prevEpochProviderHashesConsensus}, utils.Attribute{Key: "s.currentProviderHashesConsensus", Value: fc.currentProviderHashesConsensus}, utils.Attribute{Key: "latestBlock", Value: fc.latestBlockByMedian}, utils.Attribute{Key: "providersMedianOfLatestBlock", Value: providersMedianOfLatestBlock}) } atomic.StoreUint64(&fc.latestBlockByMedian, uint64(providersMedianOfLatestBlock)) // we can only set conflict to "reported". } From 8af1a0adfe3ebb49d7ab2af25afa5b1d4c157653 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:14:44 +0200 Subject: [PATCH 14/33] registering emergency tracker to all chains --- protocol/rpcconsumer/rpcconsumer.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index bfc1700074..fd8bc5ad4b 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -312,13 +312,18 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt relaysMonitorAggregator.StartMonitoring(ctx) utils.LavaFormatDebug("Starting Policy Updaters for all chains") - for chain := range chainMutexes { - policyUpdater, ok := policyUpdaters.Load(chain) + for chainId := range chainMutexes { + policyUpdater, ok := policyUpdaters.Load(chainId) if !ok { - utils.LavaFormatError("could not load policy Updater for chain", nil, utils.LogAttr("chain", chain)) + utils.LavaFormatError("could not load policy Updater for chain", nil, utils.LogAttr("chain", chainId)) continue } - consumerStateTracker.RegisterForPairingUpdates(ctx, policyUpdater) + consumerStateTracker.RegisterForPairingUpdates(ctx, policyUpdater, chainId) + emergencyTracker, ok := consumerStateTracker.ConsumerEmergencyTrackerInf.(*statetracker.EmergencyTracker) + if !ok { + utils.LavaFormatFatal("Failed converting consumerStateTracker.ConsumerEmergencyTrackerInf to *statetracker.EmergencyTracker", nil, utils.LogAttr("chain", chainId)) + } + consumerStateTracker.RegisterForPairingUpdates(ctx, emergencyTracker, chainId) } utils.LavaFormatInfo("RPCConsumer done setting up all endpoints, ready for requests") From 42bea106f4b85bfb7b4bd223797d0fdc563aebf8 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:15:04 +0200 Subject: [PATCH 15/33] finalization consensus updater spec id --- .../updaters/finalization_consensus_updater.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/protocol/statetracker/updaters/finalization_consensus_updater.go b/protocol/statetracker/updaters/finalization_consensus_updater.go index fbf07faa75..8c1b63dc56 100644 --- a/protocol/statetracker/updaters/finalization_consensus_updater.go +++ b/protocol/statetracker/updaters/finalization_consensus_updater.go @@ -18,10 +18,11 @@ type FinalizationConsensusUpdater struct { registeredFinalizationConsensuses []*lavaprotocol.FinalizationConsensus nextBlockForUpdate uint64 stateQuery *ConsumerStateQuery + specId string } -func NewFinalizationConsensusUpdater(stateQuery *ConsumerStateQuery) *FinalizationConsensusUpdater { - return &FinalizationConsensusUpdater{registeredFinalizationConsensuses: []*lavaprotocol.FinalizationConsensus{}, stateQuery: stateQuery} +func NewFinalizationConsensusUpdater(stateQuery *ConsumerStateQuery, specId string) *FinalizationConsensusUpdater { + return &FinalizationConsensusUpdater{registeredFinalizationConsensuses: []*lavaprotocol.FinalizationConsensus{}, stateQuery: stateQuery, specId: specId} } func (fcu *FinalizationConsensusUpdater) RegisterFinalizationConsensus(finalizationConsensus *lavaprotocol.FinalizationConsensus) { @@ -43,7 +44,7 @@ func (fcu *FinalizationConsensusUpdater) updateInner(latestBlock int64) { } timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - _, epoch, nextBlockForUpdate, err := fcu.stateQuery.GetPairing(timeoutCtx, "", latestBlock) + _, epoch, nextBlockForUpdate, err := fcu.stateQuery.GetPairing(timeoutCtx, fcu.specId, latestBlock) if err != nil { utils.LavaFormatError("could not get block stats for finalization consensus updater, trying again next block", err, utils.Attribute{Key: "latestBlock", Value: latestBlock}) fcu.nextBlockForUpdate += 1 From c0544d47b1fde2d9dd3242162252e4876363f376 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:15:21 +0200 Subject: [PATCH 16/33] spec id to pairing updater --- protocol/statetracker/updaters/pairing_updater.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/protocol/statetracker/updaters/pairing_updater.go b/protocol/statetracker/updaters/pairing_updater.go index da5a5fa9bd..a56c1617d9 100644 --- a/protocol/statetracker/updaters/pairing_updater.go +++ b/protocol/statetracker/updaters/pairing_updater.go @@ -25,10 +25,11 @@ type PairingUpdater struct { nextBlockForUpdate uint64 stateQuery *ConsumerStateQuery pairingUpdatables []*PairingUpdatable + specId string } -func NewPairingUpdater(stateQuery *ConsumerStateQuery) *PairingUpdater { - return &PairingUpdater{consumerSessionManagersMap: map[string][]*lavasession.ConsumerSessionManager{}, stateQuery: stateQuery} +func NewPairingUpdater(stateQuery *ConsumerStateQuery, specId string) *PairingUpdater { + return &PairingUpdater{consumerSessionManagersMap: map[string][]*lavasession.ConsumerSessionManager{}, stateQuery: stateQuery, specId: specId} } func (pu *PairingUpdater) RegisterPairing(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager) error { @@ -56,7 +57,7 @@ func (pu *PairingUpdater) RegisterPairing(ctx context.Context, consumerSessionMa func (pu *PairingUpdater) RegisterPairingUpdatable(ctx context.Context, pairingUpdatable *PairingUpdatable) error { pu.lock.Lock() defer pu.lock.Unlock() - _, epoch, _, err := pu.stateQuery.GetPairing(ctx, "", -1) + _, epoch, _, err := pu.stateQuery.GetPairing(ctx, pu.specId, -1) if err != nil { return err } @@ -103,7 +104,7 @@ func (pu *PairingUpdater) updateInner(latestBlock int64) { // get latest epoch from cache timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - _, epoch, _, err := pu.stateQuery.GetPairing(timeoutCtx, "", latestBlock) + _, epoch, _, err := pu.stateQuery.GetPairing(timeoutCtx, pu.specId, latestBlock) if err != nil { utils.LavaFormatError("could not update pairing for updatables, trying again next block", err) nextBlockForUpdateList = append(nextBlockForUpdateList, pu.nextBlockForUpdate+1) From 5d397cf9669dfb3f5c7170cbb14f6c3164fcb249 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:15:54 +0200 Subject: [PATCH 17/33] get pairing does not allow empty anymore. --- protocol/statetracker/updaters/state_query.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/protocol/statetracker/updaters/state_query.go b/protocol/statetracker/updaters/state_query.go index 284d925f6f..f95c5ed426 100644 --- a/protocol/statetracker/updaters/state_query.go +++ b/protocol/statetracker/updaters/state_query.go @@ -127,13 +127,7 @@ func (csq *ConsumerStateQuery) GetEffectivePolicy(ctx context.Context, consumerA func (csq *ConsumerStateQuery) GetPairing(ctx context.Context, chainID string, latestBlock int64) (pairingList []epochstoragetypes.StakeEntry, epoch, nextBlockForUpdate uint64, errRet error) { if chainID == "" { - if csq.lastChainID != "" { - chainID = csq.lastChainID - } - if chainID == "" { - chainID = "LAV1" - utils.LavaFormatWarning("failed to run get pairing as there is no entry for empty chainID call, using default chainID", nil, utils.Attribute{Key: "chainID", Value: chainID}) - } + return nil, 0, 0, utils.LavaFormatError("chainid is empty in GetPairing", nil, utils.Attribute{Key: "chainID", Value: chainID}) } cachedInterface, found := csq.ResponsesCache.Get(PairingRespKey + chainID) From 53c5ff0b3cfb1cc31371ac17c1a5278e8c72bca8 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:16:24 +0200 Subject: [PATCH 18/33] fix comment --- protocol/statetracker/updaters/state_query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/statetracker/updaters/state_query.go b/protocol/statetracker/updaters/state_query.go index f95c5ed426..865c41db6f 100644 --- a/protocol/statetracker/updaters/state_query.go +++ b/protocol/statetracker/updaters/state_query.go @@ -127,7 +127,7 @@ func (csq *ConsumerStateQuery) GetEffectivePolicy(ctx context.Context, consumerA func (csq *ConsumerStateQuery) GetPairing(ctx context.Context, chainID string, latestBlock int64) (pairingList []epochstoragetypes.StakeEntry, epoch, nextBlockForUpdate uint64, errRet error) { if chainID == "" { - return nil, 0, 0, utils.LavaFormatError("chainid is empty in GetPairing", nil, utils.Attribute{Key: "chainID", Value: chainID}) + return nil, 0, 0, utils.LavaFormatError("chain id is empty in GetPairing while not allowed", nil, utils.Attribute{Key: "chainID", Value: chainID}) } cachedInterface, found := csq.ResponsesCache.Get(PairingRespKey + chainID) From f7c6fc8761b20c3e52f3e3eee5fb7dcdb3e7729d Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:39:27 +0200 Subject: [PATCH 19/33] rename --- protocol/statetracker/updaters/pairing_updater.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocol/statetracker/updaters/pairing_updater.go b/protocol/statetracker/updaters/pairing_updater.go index a56c1617d9..3e1aad27ef 100644 --- a/protocol/statetracker/updaters/pairing_updater.go +++ b/protocol/statetracker/updaters/pairing_updater.go @@ -38,7 +38,7 @@ func (pu *PairingUpdater) RegisterPairing(ctx context.Context, consumerSessionMa if err != nil { return err } - pu.updateConsummerSessionManager(ctx, pairingList, consumerSessionManager, epoch) + pu.updateConsumerSessionManager(ctx, pairingList, consumerSessionManager, epoch) if nextBlockForUpdate > pu.nextBlockForUpdate { // make sure we don't update twice, this updates pu.nextBlockForUpdate pu.Update(int64(nextBlockForUpdate)) @@ -93,7 +93,7 @@ func (pu *PairingUpdater) updateInner(latestBlock int64) { } for _, consumerSessionManager := range consumerSessionManagerList { // same pairing for all apiInterfaces, they pick the right endpoints from inside using our filter function - err = pu.updateConsummerSessionManager(ctx, pairingList, consumerSessionManager, epoch) + err = pu.updateConsumerSessionManager(ctx, pairingList, consumerSessionManager, epoch) if err != nil { utils.LavaFormatError("failed updating consumer session manager", err, utils.Attribute{Key: "chainID", Value: chainID}, utils.Attribute{Key: "apiInterface", Value: consumerSessionManager.RPCEndpoint().ApiInterface}, utils.Attribute{Key: "pairingListLen", Value: len(pairingList)}) continue @@ -132,7 +132,7 @@ func (pu *PairingUpdater) Update(latestBlock int64) { pu.updateInner(latestBlock) } -func (pu *PairingUpdater) updateConsummerSessionManager(ctx context.Context, pairingList []epochstoragetypes.StakeEntry, consumerSessionManager *lavasession.ConsumerSessionManager, epoch uint64) (err error) { +func (pu *PairingUpdater) updateConsumerSessionManager(ctx context.Context, pairingList []epochstoragetypes.StakeEntry, consumerSessionManager *lavasession.ConsumerSessionManager, epoch uint64) (err error) { pairingListForThisCSM, err := pu.filterPairingListByEndpoint(ctx, planstypes.Geolocation(consumerSessionManager.RPCEndpoint().Geolocation), pairingList, consumerSessionManager.RPCEndpoint(), epoch) if err != nil { return err From 617a65993ba436f4032837f69cfbeac693f37ca0 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:56:10 +0200 Subject: [PATCH 20/33] making error more robust --- protocol/rpcconsumer/rpcconsumer_server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index befe4e62e8..26272f334f 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -154,11 +154,14 @@ func (rpccs *RPCConsumerServer) waitForPairing() error { } }() + numberOfTimeouts := 0 select { case <-reinitializedChan: break case <-time.After(30 * time.Second): - return utils.LavaFormatError("failed initial relays, csm was not initialized after timeout", nil, + numberOfTimeouts += 1 + utils.LavaFormatWarning("failed initial relays, csm was not initialized after timeout, or pairing list is empty for that chain", nil, + utils.LogAttr("times_checked", numberOfTimeouts), utils.LogAttr("chainID", rpccs.listenEndpoint.ChainID), utils.LogAttr("APIInterface", rpccs.listenEndpoint.ApiInterface), ) From de9427a5c755a35129c1ed361dbe1c4a35f3dbf4 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:56:13 +0200 Subject: [PATCH 21/33] a --- protocol/rpcconsumer/rpcconsumer_server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 26272f334f..b6f97799c8 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -154,14 +154,14 @@ func (rpccs *RPCConsumerServer) waitForPairing() error { } }() - numberOfTimeouts := 0 + numberOfTimesChecked := 0 select { case <-reinitializedChan: break case <-time.After(30 * time.Second): - numberOfTimeouts += 1 + numberOfTimesChecked += 1 utils.LavaFormatWarning("failed initial relays, csm was not initialized after timeout, or pairing list is empty for that chain", nil, - utils.LogAttr("times_checked", numberOfTimeouts), + utils.LogAttr("times_checked", numberOfTimesChecked), utils.LogAttr("chainID", rpccs.listenEndpoint.ChainID), utils.LogAttr("APIInterface", rpccs.listenEndpoint.ApiInterface), ) From 92816db540d601bdabfe3b7786a823ed4a800da7 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:56:22 +0200 Subject: [PATCH 22/33] setting timeout for get pairing --- protocol/statetracker/updaters/pairing_updater.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocol/statetracker/updaters/pairing_updater.go b/protocol/statetracker/updaters/pairing_updater.go index 3e1aad27ef..f5cdff4e9f 100644 --- a/protocol/statetracker/updaters/pairing_updater.go +++ b/protocol/statetracker/updaters/pairing_updater.go @@ -34,7 +34,9 @@ func NewPairingUpdater(stateQuery *ConsumerStateQuery, specId string) *PairingUp func (pu *PairingUpdater) RegisterPairing(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager) error { chainID := consumerSessionManager.RPCEndpoint().ChainID - pairingList, epoch, nextBlockForUpdate, err := pu.stateQuery.GetPairing(context.Background(), chainID, -1) + timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + pairingList, epoch, nextBlockForUpdate, err := pu.stateQuery.GetPairing(timeoutCtx, chainID, -1) if err != nil { return err } From 2c3341310b49ffc2f4353ef774afbb1deabb647d Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:56:35 +0200 Subject: [PATCH 23/33] chaning log to warning and making it more clear --- protocol/statetracker/updaters/state_query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/statetracker/updaters/state_query.go b/protocol/statetracker/updaters/state_query.go index 865c41db6f..9d39452c15 100644 --- a/protocol/statetracker/updaters/state_query.go +++ b/protocol/statetracker/updaters/state_query.go @@ -151,7 +151,7 @@ func (csq *ConsumerStateQuery) GetPairing(ctx context.Context, chainID string, l csq.lastChainID = chainID csq.ResponsesCache.SetWithTTL(PairingRespKey+chainID, pairingResp, 1, DefaultTimeToLiveExpiration) if len(pairingResp.Providers) == 0 { - utils.LavaFormatError("Chain returned empty provider list, check node connection and consumer subscription status", nil, + utils.LavaFormatWarning("Chain returned empty provider list, check node connection and consumer subscription status, or no providers provide this chain", nil, utils.LogAttr("chainId", chainID), utils.LogAttr("epoch", pairingResp.CurrentEpoch), utils.LogAttr("consumer_address", csq.clientCtx.FromAddress.String()), From 602a078a090177ffd4d308a3a6e8a4a83db2eb2b Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:56:49 +0200 Subject: [PATCH 24/33] retry on register pairing. --- protocol/statetracker/consumer_state_tracker.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/protocol/statetracker/consumer_state_tracker.go b/protocol/statetracker/consumer_state_tracker.go index 9b6ed6cf12..b2aec89c02 100644 --- a/protocol/statetracker/consumer_state_tracker.go +++ b/protocol/statetracker/consumer_state_tracker.go @@ -61,9 +61,21 @@ func (cst *ConsumerStateTracker) RegisterConsumerSessionManagerForPairingUpdates if !ok { utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: pairingUpdaterRaw}) } + err := pairingUpdater.RegisterPairing(ctx, consumerSessionManager) if err != nil { utils.LavaFormatError("failed registering for pairing updates", err, utils.Attribute{Key: "data", Value: consumerSessionManager.RPCEndpoint()}) + // if failed registering pairing, continue trying asynchronously + go func() { + numberOfAttempts := 1 + for { + err := pairingUpdater.RegisterPairing(ctx, consumerSessionManager) + if err == nil { + break + } + utils.LavaFormatError("Failed retry RegisterPairing", err, utils.LogAttr("attempt", numberOfAttempts)) + } + }() } } From 1e993b1cc356f46bac27d2007a1ccc6ef29a086a Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 18:57:42 +0200 Subject: [PATCH 25/33] robustness overload --- protocol/statetracker/consumer_state_tracker.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/protocol/statetracker/consumer_state_tracker.go b/protocol/statetracker/consumer_state_tracker.go index b2aec89c02..5d9c51f2ed 100644 --- a/protocol/statetracker/consumer_state_tracker.go +++ b/protocol/statetracker/consumer_state_tracker.go @@ -64,16 +64,15 @@ func (cst *ConsumerStateTracker) RegisterConsumerSessionManagerForPairingUpdates err := pairingUpdater.RegisterPairing(ctx, consumerSessionManager) if err != nil { - utils.LavaFormatError("failed registering for pairing updates", err, utils.Attribute{Key: "data", Value: consumerSessionManager.RPCEndpoint()}) // if failed registering pairing, continue trying asynchronously go func() { - numberOfAttempts := 1 + numberOfAttempts := 0 for { + utils.LavaFormatError("Failed retry RegisterPairing", err, utils.LogAttr("attempt", numberOfAttempts), utils.Attribute{Key: "data", Value: consumerSessionManager.RPCEndpoint()}) err := pairingUpdater.RegisterPairing(ctx, consumerSessionManager) if err == nil { break } - utils.LavaFormatError("Failed retry RegisterPairing", err, utils.LogAttr("attempt", numberOfAttempts)) } }() } From 6f4f5114bf72afd0baea03e01b89bf3af45e4a17 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 2 Jun 2024 19:10:24 +0200 Subject: [PATCH 26/33] .. --- protocol/lavasession/consumer_session_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index f097557e91..b28a60c359 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -127,7 +127,7 @@ func TestEndpointSortingFlow(t *testing.T) { // because probing is in a routine we need to wait for the sorting and probing to end asynchronously swapped := false - for i := 0; i < 10; i++ { + for i := 0; i < 20; i++ { if pairingList[0].Endpoints[0].NetworkAddress == grpcListener { fmt.Println("Endpoints Are Sorted!", i) swapped = true From b37d04445d78f1a54ad09c32d4614d3b0fa1b0f3 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 3 Jun 2024 10:28:57 +0200 Subject: [PATCH 27/33] fixing spec for cosmoshub --- cookbook/specs/spec_add_cosmossdk.json | 6 ++++-- .../rpcInterfaceMessages/grpcMessage.go | 16 +++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cookbook/specs/spec_add_cosmossdk.json b/cookbook/specs/spec_add_cosmossdk.json index cada7f7a6a..404981579d 100644 --- a/cookbook/specs/spec_add_cosmossdk.json +++ b/cookbook/specs/spec_add_cosmossdk.json @@ -2175,7 +2175,8 @@ }, { "name": "grpc-metadata-x-cosmos-block-height", - "kind": "pass_both" + "kind": "pass_both", + "function_tag": "SET_LATEST_IN_METADATA" } ], "inheritance_apis": [], @@ -4687,7 +4688,8 @@ "headers": [ { "name": "x-cosmos-block-height", - "kind": "pass_both" + "kind": "pass_both", + "function_tag": "SET_LATEST_IN_METADATA" }, { "name": "grpc-metadata-x-cosmos-block-height", diff --git a/protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go b/protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go index 3d42b1cf1e..00bb6dc664 100644 --- a/protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go +++ b/protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go @@ -44,14 +44,16 @@ func (jm GrpcMessage) CheckResponseError(data []byte, httpStatusCode int) (hasEr // GetParams will be deprecated after we remove old client // Currently needed because of parser.RPCInput interface func (gm GrpcMessage) GetParams() interface{} { - if gm.Msg[0] == '{' || gm.Msg[0] == '[' { - var parsedData interface{} - err := json.Unmarshal(gm.Msg, &parsedData) - if err != nil { - utils.LavaFormatError("failed to unmarshal GetParams", err) - return nil + if len(gm.Msg) > 0 { + if gm.Msg[0] == '{' || gm.Msg[0] == '[' { + var parsedData interface{} + err := json.Unmarshal(gm.Msg, &parsedData) + if err != nil { + utils.LavaFormatError("failed to unmarshal GetParams", err) + return nil + } + return parsedData } - return parsedData } parsedData, err := gm.dynamicResolve() if err != nil { From 1a6fae863956e15dca8d363b8f303957291b03b9 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 3 Jun 2024 12:17:03 +0200 Subject: [PATCH 28/33] fixing grpc header issue on response. --- protocol/chainlib/grpcproxy/grpcproxy.go | 14 +++++++++++++- protocol/statetracker/consumer_state_tracker.go | 2 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/protocol/chainlib/grpcproxy/grpcproxy.go b/protocol/chainlib/grpcproxy/grpcproxy.go index c07a4c4f32..ee53c3348e 100644 --- a/protocol/chainlib/grpcproxy/grpcproxy.go +++ b/protocol/chainlib/grpcproxy/grpcproxy.go @@ -3,6 +3,7 @@ package grpcproxy import ( "context" "net/http" + "strings" "github.com/gofiber/fiber/v2" "github.com/improbable-eng/grpc-web/go/grpcweb" @@ -79,7 +80,18 @@ func makeProxyFunc(callBack ProxyCallBack) grpc.StreamHandler { if err != nil { return err } - stream.SetHeader(md) + + // Convert metadata keys to lowercase + lowercaseMD := metadata.New(map[string]string{}) + for k, v := range md { + lowerKey := strings.ToLower(k) + lowercaseMD[lowerKey] = v + } + md = lowercaseMD + + if err := stream.SetHeader(md); err != nil { + utils.LavaFormatError("Got error when setting header", err, utils.LogAttr("headers", md)) + } return stream.SendMsg(respBytes) } } diff --git a/protocol/statetracker/consumer_state_tracker.go b/protocol/statetracker/consumer_state_tracker.go index 5d9c51f2ed..184ede2a97 100644 --- a/protocol/statetracker/consumer_state_tracker.go +++ b/protocol/statetracker/consumer_state_tracker.go @@ -2,6 +2,7 @@ package statetracker import ( "context" + "time" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" @@ -69,6 +70,7 @@ func (cst *ConsumerStateTracker) RegisterConsumerSessionManagerForPairingUpdates numberOfAttempts := 0 for { utils.LavaFormatError("Failed retry RegisterPairing", err, utils.LogAttr("attempt", numberOfAttempts), utils.Attribute{Key: "data", Value: consumerSessionManager.RPCEndpoint()}) + time.Sleep(5 * time.Second) // sleep so we don't spam get pairing for no reason err := pairingUpdater.RegisterPairing(ctx, consumerSessionManager) if err == nil { break From be2d01d8bc143c3f0cc12a53c616223e54f317c2 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 3 Jun 2024 15:27:18 +0200 Subject: [PATCH 29/33] fix test --- .../consumer_session_manager_test.go | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index b28a60c359..68958d88d9 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -40,6 +40,9 @@ const ( maxCuForVirtualEpoch = uint64(200) ) +// This variable will hold grpc server address +var grpcListener = "localhost:0" + type testServer struct { delay time.Duration } @@ -94,23 +97,38 @@ func getDelayedAddress() string { if grpcListener == delayedServerAddress { delayedServerAddress = "127.0.0.1:3336" } + utils.LavaFormatDebug("delayedAddress Chosen", utils.LogAttr("address", delayedServerAddress)) return delayedServerAddress } func TestEndpointSortingFlow(t *testing.T) { delayedAddress := getDelayedAddress() - err := createGRPCServer(delayedAddress, time.Millisecond) + err := createGRPCServer(delayedAddress, 300*time.Millisecond) csp := &ConsumerSessionsWithProvider{} for { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, _, err := csp.ConnectRawClientWithTimeout(ctx, delayedAddress) if err != nil { - utils.LavaFormatDebug("waiting for grpc server to launch") + utils.LavaFormatDebug("delayedAddress - waiting for grpc server to launch") + continue + } + utils.LavaFormatDebug("delayedAddress - grpc server is live", utils.LogAttr("address", delayedAddress)) + cancel() + break + } + + for { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, _, err := csp.ConnectRawClientWithTimeout(ctx, grpcListener) + if err != nil { + utils.LavaFormatDebug("grpcListener - waiting for grpc server to launch") continue } + utils.LavaFormatDebug("grpcListener - grpc server is live", utils.LogAttr("address", grpcListener)) cancel() break } + require.NoError(t, err) csm := CreateConsumerSessionManager() pairingList := createPairingList("", true) @@ -140,9 +158,6 @@ func TestEndpointSortingFlow(t *testing.T) { // after creating all the sessions } -// This variable will hold grpc server address -var grpcListener = "localhost:0" - func CreateConsumerSessionManager() *ConsumerSessionManager { rand.InitRandomSeed() baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better From 64a3c14f9e6b436775ab57edfc97a67b909d6c17 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Mon, 3 Jun 2024 15:52:56 +0200 Subject: [PATCH 30/33] lava debug --- protocol/common/endpoints.go | 3 +-- protocol/rpcconsumer/rpcconsumer_server.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index 5c8297a51e..2b3559aea2 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -29,8 +29,7 @@ const ( RELAY_TIMEOUT_HEADER_NAME = "lava-relay-timeout" EXTENSION_OVERRIDE_HEADER_NAME = "lava-extension" FORCE_CACHE_REFRESH_HEADER_NAME = "lava-force-cache-refresh" - GET_BLOCKED_PROVIDERS = "lava-get-blocked-providers" - GET_ERRORED_PROVIDERS = "lava-get-errored-providers" + LAVA_DEBUG = "lava-debug" // send http request to /lava/health to see if the process is up - (ret code 200) DEFAULT_HEALTH_PATH = "/lava/health" MAXIMUM_ALLOWED_TIMEOUT_EXTEND_MULTIPLIER_BY_THE_CONSUMER = 4 diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index b6f97799c8..db8856ef1a 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -976,8 +976,7 @@ func (rpccs *RPCConsumerServer) LavaDirectiveHeaders(metadata []pairingtypes.Met case common.RELAY_TIMEOUT_HEADER_NAME: case common.EXTENSION_OVERRIDE_HEADER_NAME: case common.FORCE_CACHE_REFRESH_HEADER_NAME: - case common.GET_BLOCKED_PROVIDERS: - case common.GET_ERRORED_PROVIDERS: + case common.LAVA_DEBUG: headerDirectives[name] = metaElement.Value default: metadataRet = append(metadataRet, metaElement) From 1c94b8d4fa6f00a76c58bb9784f8e7cde1a0d16f Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 5 Jun 2024 11:01:59 +0200 Subject: [PATCH 31/33] fixing name --- protocol/rpcconsumer/relay_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 1fc0721b28..9655edd94a 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -76,7 +76,7 @@ func (rp *RelayProcessor) GetAllowSessionDegradation() bool { } // in case we had an extension and managed to get a session successfully, we prevent session degradation. -func (rp *RelayProcessor) SetAllowSessionDegradation() { +func (rp *RelayProcessor) SetDisallowDegradation() { atomic.StoreUint32(&rp.allowSessionDegradation, 1) } From 2d0d9dbc5a2ae6ffd22ac81fcf42cd8e176fd931 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 5 Jun 2024 11:02:07 +0200 Subject: [PATCH 32/33] fixing small bugs --- protocol/rpcconsumer/rpcconsumer_server.go | 37 +++++++++++----------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index db8856ef1a..b451be0827 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -132,16 +132,13 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp func (rpccs *RPCConsumerServer) sendCraftedRelaysWrapper(initialRelays bool) (bool, error) { if initialRelays { // Only start after everything is initialized - check consumer session manager - err := rpccs.waitForPairing() - if err != nil { - return false, err - } + rpccs.waitForPairing() } return rpccs.sendCraftedRelays(MaxRelayRetries, initialRelays) } -func (rpccs *RPCConsumerServer) waitForPairing() error { +func (rpccs *RPCConsumerServer) waitForPairing() { reinitializedChan := make(chan bool) go func() { @@ -155,19 +152,19 @@ func (rpccs *RPCConsumerServer) waitForPairing() error { }() numberOfTimesChecked := 0 - select { - case <-reinitializedChan: - break - case <-time.After(30 * time.Second): - numberOfTimesChecked += 1 - utils.LavaFormatWarning("failed initial relays, csm was not initialized after timeout, or pairing list is empty for that chain", nil, - utils.LogAttr("times_checked", numberOfTimesChecked), - utils.LogAttr("chainID", rpccs.listenEndpoint.ChainID), - utils.LogAttr("APIInterface", rpccs.listenEndpoint.ApiInterface), - ) + for { + select { + case <-reinitializedChan: + return + case <-time.After(30 * time.Second): + numberOfTimesChecked += 1 + utils.LavaFormatWarning("failed initial relays, csm was not initialized after timeout, or pairing list is empty for that chain", nil, + utils.LogAttr("times_checked", numberOfTimesChecked), + utils.LogAttr("chainID", rpccs.listenEndpoint.ChainID), + utils.LogAttr("APIInterface", rpccs.listenEndpoint.ApiInterface), + ) + } } - - return nil } func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay *pairingtypes.RelayPrivateData, chainMessage chainlib.ChainMessage, err error) { @@ -563,7 +560,9 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( return err } relayProcessor.setSkipDataReliability(true) // disabling data reliability when disabling extensions. - relayRequestData.Extensions = []string{} + relayRequestData.Extensions = []string{} // reset request data extensions + extensions = []*spectypes.Extension{} // reset extensions too so we wont hit SetDisallowDegradation + } else { return err } @@ -574,7 +573,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // making sure next get sessions wont use regular providers if len(extensions) > 0 { - relayProcessor.SetAllowSessionDegradation() + relayProcessor.SetDisallowDegradation() } // Iterate over the sessions map From 342fb2f68f87b48e4b58ad930187d50ad5d9cd2a Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 5 Jun 2024 11:47:22 +0200 Subject: [PATCH 33/33] lint --- protocol/rpcconsumer/rpcconsumer_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index b451be0827..a34df6af4a 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -562,7 +562,6 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( relayProcessor.setSkipDataReliability(true) // disabling data reliability when disabling extensions. relayRequestData.Extensions = []string{} // reset request data extensions extensions = []*spectypes.Extension{} // reset extensions too so we wont hit SetDisallowDegradation - } else { return err }