Skip to content

Commit

Permalink
fixing dependencies for getpairing errors on consumer side.
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Aug 25, 2024
1 parent ea235a1 commit 7c637a0
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 19 deletions.
1 change: 1 addition & 0 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
GUID_HEADER_NAME = "Lava-Guid"
ERRORED_PROVIDERS_HEADER_NAME = "Lava-Errored-Providers"
REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers"
LAVAP_VERSION_HEADER_NAME = "Lavap-Version"
LAVA_CONSUMER_PROCESS_GUID = "lava-consumer-process-guid"
// these headers need to be lowercase
BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME = "lava-providers-block"
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *mockConsumerStateTracker) RegisterForSpecUpdates(ctx context.Context, s
return nil
}

func (m *mockConsumerStateTracker) RegisterFinalizationConsensusForUpdates(context.Context, *finalizationconsensus.FinalizationConsensus) {
func (m *mockConsumerStateTracker) RegisterFinalizationConsensusForUpdates(context.Context, *finalizationconsensus.FinalizationConsensus, bool) {
}

func (m *mockConsumerStateTracker) RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error {
Expand Down
6 changes: 6 additions & 0 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func (csm *ConsumerSessionManager) RPCEndpoint() RPCEndpoint {
return *csm.rpcEndpoint
}

func (csm *ConsumerSessionManager) GetNumberOfValidProviders() int {
csm.lock.RLock()
defer csm.lock.RUnlock()
return len(csm.validAddresses)
}

func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList map[uint64]*ConsumerSessionsWithProvider) error {
pairingListLength := len(pairingList)
// TODO: we can block updating until some of the probing is done, this can prevent failed attempts on epoch change when we have no information on the providers,
Expand Down
5 changes: 4 additions & 1 deletion protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ func (bpm DirectiveHeaders) GetBlockedProviders() []string {
}
blockedProviders, ok := bpm.directiveHeaders[common.BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME]
if ok {
return strings.Split(blockedProviders, ",")
blockProviders := strings.Split(blockedProviders, ",")
if len(blockProviders) <= 2 {
return blockProviders
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/rpcconsumer/consumer_state_tracker_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type ConsumerStateTrackerInf interface {
RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf)
RegisterConsumerSessionManagerForPairingUpdates(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager, staticProvidersList []*lavasession.RPCProviderEndpoint)
RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error
RegisterFinalizationConsensusForUpdates(context.Context, *finalizationconsensus.FinalizationConsensus)
RegisterFinalizationConsensusForUpdates(context.Context, *finalizationconsensus.FinalizationConsensus, bool)
RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error
TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error
GetConsumerPolicy(ctx context.Context, consumerAddress, chainID string) (*plantypes.Policy, error)
Expand Down Expand Up @@ -234,6 +234,15 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
return err
}

// Filter the relevant static providers
relevantStaticProviderList := []*lavasession.RPCProviderEndpoint{}
for _, staticProvider := range options.staticProvidersList {
if staticProvider.ChainID == rpcEndpoint.ChainID {
relevantStaticProviderList = append(relevantStaticProviderList, staticProvider)
}
}
staticProvidersActive := len(relevantStaticProviderList) > 0

_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
var optimizer *provideroptimizer.ProviderOptimizer
var consumerConsistency *ConsumerConsistency
Expand Down Expand Up @@ -273,7 +282,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
if !exists {
// doesn't exist for this chain create a new one
finalizationConsensus = finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID)
consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus)
consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus, staticProvidersActive)
finalizationConsensuses.Store(chainID, finalizationConsensus)
} else {
var ok bool
Expand Down Expand Up @@ -301,7 +310,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
activeSubscriptionProvidersStorage := lavasession.NewActiveSubscriptionProvidersStorage()
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager, consumerAddr.String(), activeSubscriptionProvidersStorage)
// Register For Updates
rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager, options.staticProvidersList)
rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager, relevantStaticProviderList)

var relaysMonitor *metrics.RelaysMonitor
if options.cmdFlags.RelaysHealthEnableFlag {
Expand Down
14 changes: 14 additions & 0 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/lavanet/lava/v2/protocol/lavasession"
"github.com/lavanet/lava/v2/protocol/metrics"
"github.com/lavanet/lava/v2/protocol/performance"
"github.com/lavanet/lava/v2/protocol/upgrade"
"github.com/lavanet/lava/v2/utils"
"github.com/lavanet/lava/v2/utils/protocopy"
"github.com/lavanet/lava/v2/utils/rand"
Expand Down Expand Up @@ -227,7 +228,14 @@ func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retrie
success := false
var err error
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, protocolMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays, rpccs.rpcConsumerLogs, rpccs, rpccs.disableNodeErrorRetry, rpccs.relayRetriesManager)
usedProvidersResets := 1
for i := 0; i < retries; i++ {
// Check if we even have enough providers to communicate with them all.
// If we have 1 provider we will reset the used providers always.
if ((i + 1) * usedProvidersResets) > rpccs.consumerSessionManager.GetNumberOfValidProviders() {
usedProvidersResets++
relayProcessor.GetUsedProviders().ClearUnwanted()
}
err = rpccs.sendRelayToProvider(ctx, protocolMessage, "-init-", "", relayProcessor, nil)
if lavasession.PairingListEmptyError.Is(err) {
// we don't have pairings anymore, could be related to unwanted providers
Expand Down Expand Up @@ -1418,6 +1426,12 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, reportedProvidersMD)
}

version := pairingtypes.Metadata{
Name: common.LAVAP_VERSION_HEADER_NAME,
Value: upgrade.GetCurrentVersion().ConsumerVersion,
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, version)
}

relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, metadataReply...)
Expand Down
4 changes: 2 additions & 2 deletions protocol/statetracker/consumer_state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (cst *ConsumerStateTracker) RegisterForPairingUpdates(ctx context.Context,
}
}

func (cst *ConsumerStateTracker) RegisterFinalizationConsensusForUpdates(ctx context.Context, finalizationConsensus *finalizationconsensus.FinalizationConsensus) {
finalizationConsensusUpdater := updaters.NewFinalizationConsensusUpdater(cst.stateQuery, finalizationConsensus.SpecId)
func (cst *ConsumerStateTracker) RegisterFinalizationConsensusForUpdates(ctx context.Context, finalizationConsensus *finalizationconsensus.FinalizationConsensus, ignoreQueryErrors bool) {
finalizationConsensusUpdater := updaters.NewFinalizationConsensusUpdater(cst.stateQuery, finalizationConsensus.SpecId, ignoreQueryErrors)
finalizationConsensusUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, finalizationConsensusUpdater)
finalizationConsensusUpdater, ok := finalizationConsensusUpdaterRaw.(*updaters.FinalizationConsensusUpdater)
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ type FinalizationConsensusUpdater struct {
nextBlockForUpdate uint64
stateQuery *ConsumerStateQuery
specId string
ignoreQueryErrors bool // used when static providers are configured so we don't spam errors on failed get pairing.
}

func NewFinalizationConsensusUpdater(stateQuery *ConsumerStateQuery, specId string) *FinalizationConsensusUpdater {
return &FinalizationConsensusUpdater{registeredFinalizationConsensuses: []*finalizationconsensus.FinalizationConsensus{}, stateQuery: stateQuery, specId: specId}
func NewFinalizationConsensusUpdater(stateQuery *ConsumerStateQuery, specId string, ignoreQueryErrors bool) *FinalizationConsensusUpdater {
return &FinalizationConsensusUpdater{registeredFinalizationConsensuses: []*finalizationconsensus.FinalizationConsensus{}, stateQuery: stateQuery, specId: specId, ignoreQueryErrors: ignoreQueryErrors}
}

func (fcu *FinalizationConsensusUpdater) RegisterFinalizationConsensus(finalizationConsensus *finalizationconsensus.FinalizationConsensus) {
Expand All @@ -46,7 +47,9 @@ func (fcu *FinalizationConsensusUpdater) updateInner(latestBlock int64) {
defer cancel()
_, epoch, nextBlockForUpdate, err := fcu.stateQuery.GetPairing(timeoutCtx, fcu.specId, latestBlock)
if err != nil {
utils.LavaFormatError("could not get block stats for finalization consensus updater, trying again next block", err, utils.Attribute{Key: "latestBlock", Value: latestBlock})
if !fcu.ignoreQueryErrors {
utils.LavaFormatError("could not get block stats for finalization consensus updater, trying again next block", err, utils.Attribute{Key: "latestBlock", Value: latestBlock})
}
fcu.nextBlockForUpdate += 1
return
}
Expand Down
28 changes: 24 additions & 4 deletions protocol/statetracker/updaters/pairing_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewPairingUpdater(stateQuery ConsumerStateQueryInf, specId string) *Pairing
return &PairingUpdater{consumerSessionManagersMap: map[string][]ConsumerSessionManagerInf{}, stateQuery: stateQuery, specId: specId, staticProviders: []*lavasession.RPCProviderEndpoint{}}
}

func (pu *PairingUpdater) updateStaticProviders(staticProviders []*lavasession.RPCProviderEndpoint) {
func (pu *PairingUpdater) updateStaticProviders(staticProviders []*lavasession.RPCProviderEndpoint) int {
pu.lock.Lock()
defer pu.lock.Unlock()
if len(staticProviders) > 0 && len(pu.staticProviders) == 0 {
Expand All @@ -56,17 +56,30 @@ func (pu *PairingUpdater) updateStaticProviders(staticProviders []*lavasession.R
}
}
}
// return length of relevant static providers
return len(pu.staticProviders)
}

func (pu *PairingUpdater) getNumberOfStaticProviders() int {
pu.lock.RLock()
defer pu.lock.RUnlock()
return len(pu.staticProviders)
}

func (pu *PairingUpdater) RegisterPairing(ctx context.Context, consumerSessionManager ConsumerSessionManagerInf, staticProviders []*lavasession.RPCProviderEndpoint) error {
chainID := consumerSessionManager.RPCEndpoint().ChainID
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
pairingList, epoch, nextBlockForUpdate, err := pu.stateQuery.GetPairing(timeoutCtx, chainID, -1)
numberOfRelevantProviders := pu.updateStaticProviders(staticProviders)
// If user configured static providers but did not buy a subscription
if err != nil {
return err
if numberOfRelevantProviders == 0 {
return err
}
// else we continue with static providers.
epoch += 1
}
pu.updateStaticProviders(staticProviders)
pu.updateConsumerSessionManager(ctx, pairingList, consumerSessionManager, epoch)
if nextBlockForUpdate > pu.nextBlockForUpdate {
// make sure we don't update twice, this updates pu.nextBlockForUpdate
Expand All @@ -88,6 +101,10 @@ func (pu *PairingUpdater) RegisterPairingUpdatable(ctx context.Context, pairingU
defer pu.lock.Unlock()
_, epoch, _, err := pu.stateQuery.GetPairing(ctx, pu.specId, -1)
if err != nil {
if len(pu.staticProviders) > 0 {
// skipping errors for get pairing if static providers are set.
return nil
}
return err
}

Expand All @@ -114,6 +131,9 @@ func (pu *PairingUpdater) updateInner(latestBlock int64) {
pairingList, epoch, nextBlockForUpdate, err := pu.stateQuery.GetPairing(timeoutCtx, chainID, latestBlock)
cancel()
if err != nil {
if len(pu.staticProviders) > 0 {
return
}
utils.LavaFormatError("could not update pairing for chain, trying again next block", err, utils.Attribute{Key: "chain", Value: chainID})
nextBlockForUpdateList = append(nextBlockForUpdateList, pu.nextBlockForUpdate+1)
continue
Expand Down Expand Up @@ -255,7 +275,7 @@ func (pu *PairingUpdater) filterPairingListByEndpoint(ctx context.Context, curre
provider.Stake,
)
}
if len(pairing) == 0 {
if len(pairing)+pu.getNumberOfStaticProviders() == 0 {
return nil, utils.LavaFormatError("Failed getting pairing for consumer, pairing is empty", err, utils.Attribute{Key: "apiInterface", Value: rpcEndpoint.ApiInterface}, utils.Attribute{Key: "ChainID", Value: rpcEndpoint.ChainID}, utils.Attribute{Key: "geolocation", Value: rpcEndpoint.Geolocation})
}
// replace previous pairing with new providers
Expand Down
2 changes: 1 addition & 1 deletion protocol/statetracker/updaters/state_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (csq *ConsumerStateQuery) GetPairing(ctx context.Context, chainID string, l
Client: csq.clientCtx.FromAddress.String(),
})
if err != nil {
return nil, 0, 0, utils.LavaFormatError("Failed in get pairing query", err, utils.Attribute{})
return nil, 0, 0, err
}
csq.lastChainID = chainID
csq.ResponsesCache.SetWithTTL(PairingRespKey+chainID, pairingResp, 1, DefaultTimeToLiveExpiration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ make install-all
echo "[Test Setup] setting up a new lava node"
screen -d -m -S node bash -c "./scripts/start_env_dev.sh"
screen -ls
echo "[Lavavisor Setup] sleeping 20 seconds for node to finish setup (if its not enough increase timeout)"
sleep 20
echo "[Test Setup] sleeping 20 seconds for node to finish setup (if its not enough increase timeout)"
sleep 5
wait_for_lava_node_to_start

GASPRICE="0.00002ulava"
lavad tx gov submit-legacy-proposal spec-add ./cookbook/specs/ibc.json,./cookbook/specs/cosmoswasm.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/cosmossdk_45.json,./cookbook/specs/cosmossdk_full.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/cosmoshub.json,./cookbook/specs/lava.json,./cookbook/specs/osmosis.json,./cookbook/specs/fantom.json,./cookbook/specs/celo.json,./cookbook/specs/optimism.json,./cookbook/specs/arbitrum.json,./cookbook/specs/starknet.json,./cookbook/specs/aptos.json,./cookbook/specs/juno.json,./cookbook/specs/polygon.json,./cookbook/specs/evmos.json,./cookbook/specs/base.json,./cookbook/specs/canto.json,./cookbook/specs/sui.json,./cookbook/specs/solana.json,./cookbook/specs/bsc.json,./cookbook/specs/axelar.json,./cookbook/specs/avalanche.json,./cookbook/specs/fvm.json --lava-dev-test -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE &
Expand All @@ -41,7 +42,7 @@ PROVIDER1_LISTENER="127.0.0.1:2221"
# static configuration
PROVIDER4_LISTENER="127.0.0.1:2220"

lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
# lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE
wait_next_block
lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" --delegate-limit 0ulava 1 $(operator_address) -y --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE

Expand Down

0 comments on commit 7c637a0

Please sign in to comment.