Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding provider optimizer tiers #1679

Merged
merged 32 commits into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
346b559
added selection weights, wip adding tiers
omerlavanet Sep 8, 2024
57d0474
finished tiering in the optimizer, needs tests
omerlavanet Sep 8, 2024
85a0731
Merge branch 'main' into add-optimizer-tiers
omerlavanet Sep 9, 2024
393214a
added tests for tiering and bug fixes
omerlavanet Sep 9, 2024
034c194
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
omerlavanet Sep 9, 2024
e7c817b
added tier selection tests
omerlavanet Sep 9, 2024
f9d5846
added selection weight unitests
omerlavanet Sep 9, 2024
0e059af
added shift tier chance to selection tier
omerlavanet Sep 9, 2024
1c547b3
adjusted optimizer to work without perturbation
omerlavanet Sep 9, 2024
c3126d8
fixes
omerlavanet Sep 10, 2024
abbc89d
adding optimizer flags
ranlavanet Sep 10, 2024
b5ab702
improved formulas for selection, added unitests
omerlavanet Sep 10, 2024
0db576d
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
omerlavanet Sep 10, 2024
7387800
added unitests
omerlavanet Sep 10, 2024
39cfec9
fix exploration chance
omerlavanet Sep 11, 2024
da454fe
refactor optimizer to allow extracting tiers without choosing a provi…
omerlavanet Sep 11, 2024
bcc1721
finished fixing old optimizer tests
omerlavanet Sep 11, 2024
4276078
added unitests, added code to give stake handicap for higher tiers be…
omerlavanet Sep 11, 2024
59c0786
added more unitests
omerlavanet Sep 11, 2024
8c65666
lint
omerlavanet Sep 11, 2024
c7362be
lint
omerlavanet Sep 11, 2024
41a11f1
Merge branch 'main' into add-optimizer-tiers
omerlavanet Sep 11, 2024
2b116a2
comments
omerlavanet Sep 11, 2024
ba38a7c
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
omerlavanet Sep 11, 2024
94d9e0c
Merge remote-tracking branch 'origin/main' into add-optimizer-tiers
omerlavanet Sep 12, 2024
30a7dce
fix unitests failing when running concurrently
omerlavanet Sep 12, 2024
cd1b3ef
Merge branch 'main' into add-optimizer-tiers
omerlavanet Sep 12, 2024
532e8d4
support not setting stake (usually in tests) in pairing
omerlavanet Sep 12, 2024
f166150
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
omerlavanet Sep 12, 2024
dd307cd
now actually solve the uninitialized stake
omerlavanet Sep 12, 2024
f3b2489
remove duplication of batches
omerlavanet Sep 13, 2024
e3e9c78
Merge branch 'main' into add-optimizer-tiers
omerlavanet Sep 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
// This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains.
SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error"
UseStaticSpecFlag = "use-static-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain

// optimizer flags
SetProviderOptimizerBestTierPickChance = "set-provider-optimizer-best-tier-pick-chance"
SetProviderOptimizerWorstTierPickChance = "set-provider-optimizer-worst-tier-pick-chance"
SetProviderOptimizerNumberOfTiersToCreate = "set-provider-optimizer-number-of-tiers-to-create"
)

const (
Expand Down
3 changes: 2 additions & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList
csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses.
// reset session related metrics
csm.consumerMetricsManager.ResetSessionRelatedMetrics()
csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList))
utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()})
return nil
}
Expand Down Expand Up @@ -638,7 +639,7 @@ func (csm *ConsumerSessionManager) getValidProviderAddresses(ignoredProvidersLis
if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST {
providers = csm.getTopTenProvidersForStatefulCalls(validAddresses, ignoredProvidersList)
} else {
providers = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock, OptimizerPerturbation)
providers, _ = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock)
}

utils.LavaFormatTrace("Choosing providers",
Expand Down
29 changes: 28 additions & 1 deletion protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing"
AllowGRPCCompressionFlag = "allow-grpc-compression-for-consumer-provider-communication"
maximumStreamsOverASingleConnection = 100
WeightMultiplierForStaticProviders = 10
)

var (
Expand Down Expand Up @@ -72,9 +73,10 @@ type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
Strategy() provideroptimizer.Strategy
UpdateWeights(map[string]int64)
}

type ignoredProviders struct {
Expand Down Expand Up @@ -595,3 +597,28 @@ func CalculateAvailabilityScore(qosReport *QoSReport) (downtimePercentageRet, sc
scaledAvailabilityScore := sdk.MaxDec(sdk.ZeroDec(), AvailabilityPercentage.Sub(downtimePercentage).Quo(AvailabilityPercentage))
return downtimePercentage, scaledAvailabilityScore
}

func CalcWeightsByStake(providers map[uint64]*ConsumerSessionsWithProvider) (weights map[string]int64) {
weights = make(map[string]int64)
staticProviders := make([]*ConsumerSessionsWithProvider, 0)
maxWeight := int64(1)
for _, cswp := range providers {
if cswp.StaticProvider {
staticProviders = append(staticProviders, cswp)
continue
}
stakeAmount := cswp.getProviderStakeSize().Amount
stake := int64(10) // defaults to 10 if stake isn't set
if !stakeAmount.IsNil() && stakeAmount.IsInt64() {
stake = stakeAmount.Int64()
}
if stake > maxWeight {
maxWeight = stake
}
weights[cswp.PublicLavaAddress] = stake
}
for _, cswp := range staticProviders {
weights[cswp.PublicLavaAddress] = maxWeight * WeightMultiplierForStaticProviders
}
return weights
}
128 changes: 87 additions & 41 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ const (
WANTED_PRECISION = int64(8)
)

var (
OptimizerNumTiers = 4
MinimumEntries = 5
ATierChance = 0.75
LastTierChance = 0.0
)

type ConcurrentBlockStore struct {
Lock sync.Mutex
Time time.Time
Expand All @@ -49,6 +56,13 @@ type ProviderOptimizer struct {
baseWorldLatency time.Duration
wantedNumProvidersInConcurrency uint
latestSyncData ConcurrentBlockStore
selectionWeighter SelectionWeighter
OptimizerNumTiers int
}

type Exploration struct {
address string
time time.Time
}

type ProviderData struct {
Expand All @@ -72,6 +86,10 @@ const (
STRATEGY_DISTRIBUTED
)

func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64) {
po.selectionWeighter.SetWeights(weights)
}

func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) {
po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now())
}
Expand Down Expand Up @@ -131,16 +149,12 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc
)
}

// returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top
func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string) {
returnedProviders := make([]string, 1) // location 0 is always the best score
latencyScore := math.MaxFloat64 // smaller = better i.e less latency
syncScore := math.MaxFloat64 // smaller = better i.e less sync lag
numProviders := len(allAddresses)
if po.strategy == STRATEGY_DISTRIBUTED {
// distribute relays across more providers
perturbationPercentage *= 2
}
func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration) {
latencyScore := math.MaxFloat64 // smaller = better i.e less latency
syncScore := math.MaxFloat64 // smaller = better i.e less sync lag

explorationCandidate := Exploration{address: "", time: time.Now().Add(time.Hour)}
selectionTier := NewSelectionTier()
for _, providerAddress := range allAddresses {
if _, ok := ignoredProviders[providerAddress]; ok {
// ignored provider, skip it
Expand All @@ -152,16 +166,12 @@ func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProvid
}
// latency score
latencyScoreCurrent := po.calculateLatencyScore(providerData, cu, requestedBlock) // smaller == better i.e less latency
// latency perturbation
latencyScoreCurrent = pertrubWithNormalGaussian(latencyScoreCurrent, perturbationPercentage)

// sync score
syncScoreCurrent := float64(0)
if requestedBlock < 0 {
// means user didn't ask for a specific block and we want to give him the best
syncScoreCurrent = po.calculateSyncScore(providerData.Sync) // smaller == better i.e less sync lag
// sync perturbation
syncScoreCurrent = pertrubWithNormalGaussian(syncScoreCurrent, perturbationPercentage)
}

utils.LavaFormatTrace("scores information",
Expand All @@ -171,29 +181,51 @@ func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProvid
utils.LogAttr("latencyScore", latencyScore),
utils.LogAttr("syncScore", syncScore),
)

// we want the minimum latency and sync diff
if po.isBetterProviderScore(latencyScore, latencyScoreCurrent, syncScore, syncScoreCurrent) || len(returnedProviders) == 0 {
if returnedProviders[0] != "" && po.shouldExplore(len(returnedProviders), numProviders) {
// we are about to overwrite position 0, and this provider needs a chance to be in exploration
returnedProviders = append(returnedProviders, returnedProviders[0])
}
returnedProviders[0] = providerAddress // best provider is always on position 0
latencyScore = latencyScoreCurrent
syncScore = syncScoreCurrent
continue
}
if po.shouldExplore(len(returnedProviders), numProviders) {
returnedProviders = append(returnedProviders, providerAddress)
providerScore := po.calcProviderScore(latencyScoreCurrent, syncScoreCurrent)
selectionTier.AddScore(providerAddress, providerScore)

// check if candidate for exploration
updateTime := providerData.Latency.Time
if updateTime.Add(10*time.Second).Before(time.Now()) && updateTime.Before(explorationCandidate.time) {
// if the provider didn't update its data for 10 seconds, it is a candidate for exploration
explorationCandidate = Exploration{address: providerAddress, time: updateTime}
}
}
return selectionTier, explorationCandidate
}

utils.LavaFormatTrace("returned providers",
// returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top
func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int) {
selectionTier, explorationCandidate := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock)
if selectionTier.ScoresCount() == 0 {
// no providers to choose from
return []string{}, -1
}
initialChances := map[int]float64{0: ATierChance}
if selectionTier.ScoresCount() < po.OptimizerNumTiers {
po.OptimizerNumTiers = selectionTier.ScoresCount()
}
if selectionTier.ScoresCount() >= MinimumEntries*2 {
// if we have more than 2*MinimumEntries we set the LastTierChance configured
initialChances[(po.OptimizerNumTiers - 1)] = LastTierChance
}
shiftedChances := selectionTier.ShiftTierChance(po.OptimizerNumTiers, initialChances)
tier = selectionTier.SelectTierRandomly(po.OptimizerNumTiers, shiftedChances)
tierProviders := selectionTier.GetTier(tier, po.OptimizerNumTiers, MinimumEntries)
// TODO: add penalty if a provider is chosen too much
selectedProvider := po.selectionWeighter.WeightedChoice(tierProviders)
returnedProviders := []string{selectedProvider}
if explorationCandidate.address != "" && po.shouldExplore(1, selectionTier.ScoresCount()) {
returnedProviders = append(returnedProviders, explorationCandidate.address)
}
utils.LavaFormatTrace("[Optimizer] returned providers",
utils.LogAttr("providers", strings.Join(returnedProviders, ",")),
utils.LogAttr("cu", cu),
utils.LogAttr("shiftedChances", shiftedChances),
utils.LogAttr("tier", tier),
)

return returnedProviders
return returnedProviders, tier
}

// calculate the expected average time until this provider catches up with the given latestSync block
Expand Down Expand Up @@ -242,30 +274,35 @@ func (po *ProviderOptimizer) shouldExplore(currentNumProvders, numProviders int)
case STRATEGY_PRIVACY:
return false // only one at a time
}
// Dividing the random threshold by the loop count ensures that the overall probability of success is the requirement for the entire loop not per iteration
return rand.Float64() < explorationChance/float64(numProviders)
return rand.Float64() < explorationChance
}

func (po *ProviderOptimizer) isBetterProviderScore(latencyScore, latencyScoreCurrent, syncScore, syncScoreCurrent float64) bool {
var latencyWeight float64
switch po.strategy {
case STRATEGY_LATENCY:
latencyWeight = 0.7
case STRATEGY_SYNC_FRESHNESS:
latencyWeight = 0.2
case STRATEGY_PRIVACY:
// pick at random regardless of score
if rand.Intn(2) == 0 {
return true
}
return false
default:
latencyWeight = 0.6
}
if syncScoreCurrent == 0 {
return latencyScore > latencyScoreCurrent
}
return latencyScore*latencyWeight+syncScore*(1-latencyWeight) > latencyScoreCurrent*latencyWeight+syncScoreCurrent*(1-latencyWeight)
return po.calcProviderScore(latencyScore, syncScore) > po.calcProviderScore(latencyScoreCurrent, syncScoreCurrent)
}

func (po *ProviderOptimizer) calcProviderScore(latencyScore, syncScore float64) float64 {
var latencyWeight float64
switch po.strategy {
case STRATEGY_LATENCY:
latencyWeight = 0.7
case STRATEGY_SYNC_FRESHNESS:
latencyWeight = 0.2
default:
latencyWeight = 0.6
}
return latencyScore*latencyWeight + syncScore*(1-latencyWeight)
}

func (po *ProviderOptimizer) calculateSyncScore(syncScore score.ScoreStore) float64 {
Expand Down Expand Up @@ -469,7 +506,16 @@ func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency
// overwrite
wantedNumProvidersInConcurrency = 1
}
return &ProviderOptimizer{strategy: strategy, providersStorage: cache, averageBlockTime: averageBlockTIme, baseWorldLatency: baseWorldLatency, providerRelayStats: relayCache, wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency}
return &ProviderOptimizer{
strategy: strategy,
providersStorage: cache,
averageBlockTime: averageBlockTIme,
baseWorldLatency: baseWorldLatency,
providerRelayStats: relayCache,
wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency,
selectionWeighter: NewSelectionWeighter(),
OptimizerNumTiers: OptimizerNumTiers,
}
}

// calculate the probability a random variable with a poisson distribution
Expand Down
Loading
Loading