Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding provider optimizer tiers #1679

Merged
merged 32 commits into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
346b559
added selection weights, wip adding tiers
omerlavanet Sep 8, 2024
57d0474
finished tiering in the optimizer, needs tests
omerlavanet Sep 8, 2024
85a0731
Merge branch 'main' into add-optimizer-tiers
omerlavanet Sep 9, 2024
393214a
added tests for tiering and bug fixes
omerlavanet Sep 9, 2024
034c194
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
omerlavanet Sep 9, 2024
e7c817b
added tier selection tests
omerlavanet Sep 9, 2024
f9d5846
added selection weight unitests
omerlavanet Sep 9, 2024
0e059af
added shift tier chance to selection tier
omerlavanet Sep 9, 2024
1c547b3
adjusted optimizer to work without perturbation
omerlavanet Sep 9, 2024
c3126d8
fixes
omerlavanet Sep 10, 2024
abbc89d
adding optimizer flags
ranlavanet Sep 10, 2024
b5ab702
improved formulas for selection, added unitests
omerlavanet Sep 10, 2024
0db576d
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
omerlavanet Sep 10, 2024
7387800
added unitests
omerlavanet Sep 10, 2024
39cfec9
fix exploration chance
omerlavanet Sep 11, 2024
da454fe
refactor optimizer to allow extracting tiers without choosing a provi…
omerlavanet Sep 11, 2024
bcc1721
finished fixing old optimizer tests
omerlavanet Sep 11, 2024
4276078
added unitests, added code to give stake handicap for higher tiers be…
omerlavanet Sep 11, 2024
59c0786
added more unitests
omerlavanet Sep 11, 2024
8c65666
lint
omerlavanet Sep 11, 2024
c7362be
lint
omerlavanet Sep 11, 2024
41a11f1
Merge branch 'main' into add-optimizer-tiers
omerlavanet Sep 11, 2024
2b116a2
comments
omerlavanet Sep 11, 2024
ba38a7c
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
omerlavanet Sep 11, 2024
94d9e0c
Merge remote-tracking branch 'origin/main' into add-optimizer-tiers
omerlavanet Sep 12, 2024
30a7dce
fix unitests failing when running concurrently
omerlavanet Sep 12, 2024
cd1b3ef
Merge branch 'main' into add-optimizer-tiers
omerlavanet Sep 12, 2024
532e8d4
support not setting stake (usually in tests) in pairing
omerlavanet Sep 12, 2024
f166150
Merge branch 'add-optimizer-tiers' of github.com:lavanet/lava into ad…
omerlavanet Sep 12, 2024
dd307cd
now actually solve the uninitialized stake
omerlavanet Sep 12, 2024
f3b2489
remove duplication of batches
omerlavanet Sep 13, 2024
e3e9c78
Merge branch 'main' into add-optimizer-tiers
omerlavanet Sep 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList
csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses.
// reset session related metrics
csm.consumerMetricsManager.ResetSessionRelatedMetrics()
csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList))
utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()})
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ProviderOptimizer interface {
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
Strategy() provideroptimizer.Strategy
UpdateWeights(map[string]int64)
}

type ignoredProviders struct {
Expand Down Expand Up @@ -595,3 +596,24 @@ func CalculateAvailabilityScore(qosReport *QoSReport) (downtimePercentageRet, sc
scaledAvailabilityScore := sdk.MaxDec(sdk.ZeroDec(), AvailabilityPercentage.Sub(downtimePercentage).Quo(AvailabilityPercentage))
return downtimePercentage, scaledAvailabilityScore
}

func CalcWeightsByStake(providers map[uint64]*ConsumerSessionsWithProvider) (weights map[string]int64) {
weights = make(map[string]int64)
staticProviders := make([]*ConsumerSessionsWithProvider, 0)
maxWeight := int64(1)
for _, cswp := range providers {
if cswp.StaticProvider {
staticProviders = append(staticProviders, cswp)
continue
}
stake := cswp.getProviderStakeSize().Amount.Int64()
if stake > maxWeight {
maxWeight = stake
}
weights[cswp.PublicLavaAddress] = stake
}
for _, cswp := range staticProviders {
weights[cswp.PublicLavaAddress] = maxWeight * 10
omerlavanet marked this conversation as resolved.
Show resolved Hide resolved
}
return weights
}
86 changes: 58 additions & 28 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ const (
WANTED_PRECISION = int64(8)
)

var (
NumTiers = 4
MinimumEntries = 5
ATierChance = 0.75
LastTierChance = 0.0
)

type ConcurrentBlockStore struct {
Lock sync.Mutex
Time time.Time
Expand All @@ -49,6 +56,7 @@ type ProviderOptimizer struct {
baseWorldLatency time.Duration
wantedNumProvidersInConcurrency uint
latestSyncData ConcurrentBlockStore
selectionWeighter SelectionWeighter
}

type ProviderData struct {
Expand All @@ -72,6 +80,10 @@ const (
STRATEGY_DISTRIBUTED
)

func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64) {
po.selectionWeighter.SetWeights(weights)
}

func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) {
po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now())
}
Expand Down Expand Up @@ -133,14 +145,18 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc

// returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top
func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string) {
returnedProviders := make([]string, 1) // location 0 is always the best score
latencyScore := math.MaxFloat64 // smaller = better i.e less latency
syncScore := math.MaxFloat64 // smaller = better i.e less sync lag
numProviders := len(allAddresses)
latencyScore := math.MaxFloat64 // smaller = better i.e less latency
syncScore := math.MaxFloat64 // smaller = better i.e less sync lag
if po.strategy == STRATEGY_DISTRIBUTED {
// distribute relays across more providers
perturbationPercentage *= 2
}
type exploration struct {
address string
time time.Time
}
explorationCandidate := exploration{address: "", time: time.Now().Add(time.Hour)}
selectionTier := NewSelectionTier()
for _, providerAddress := range allAddresses {
if _, ok := ignoredProviders[providerAddress]; ok {
// ignored provider, skip it
Expand Down Expand Up @@ -171,23 +187,23 @@ func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProvid
utils.LogAttr("latencyScore", latencyScore),
utils.LogAttr("syncScore", syncScore),
)

// we want the minimum latency and sync diff
if po.isBetterProviderScore(latencyScore, latencyScoreCurrent, syncScore, syncScoreCurrent) || len(returnedProviders) == 0 {
if returnedProviders[0] != "" && po.shouldExplore(len(returnedProviders), numProviders) {
// we are about to overwrite position 0, and this provider needs a chance to be in exploration
returnedProviders = append(returnedProviders, returnedProviders[0])
}
returnedProviders[0] = providerAddress // best provider is always on position 0
latencyScore = latencyScoreCurrent
syncScore = syncScoreCurrent
continue
}
if po.shouldExplore(len(returnedProviders), numProviders) {
returnedProviders = append(returnedProviders, providerAddress)
providerScore := po.calcProviderScore(latencyScoreCurrent, syncScoreCurrent)
selectionTier.AddScore(providerAddress, providerScore)

// check if candidate for exploration
updateTime := providerData.Latency.Time
if updateTime.Add(30*time.Second).Before(time.Now()) && updateTime.Before(explorationCandidate.time) {
// if the provider didn't update its data for 30 seconds, it is a candidate for exploration
explorationCandidate = exploration{address: providerAddress, time: updateTime}
}
}

tier := selectionTier.SelectTierRandomly(NumTiers, map[int]float64{0: ATierChance, (NumTiers - 1): LastTierChance})
tierProviders := selectionTier.GetTier(tier, NumTiers, MinimumEntries)
selectedProvider := po.selectionWeighter.WeightedChoice(tierProviders)
returnedProviders := []string{selectedProvider}
if explorationCandidate.address != "" && po.shouldExplore(1, len(allAddresses)) {
returnedProviders = append(returnedProviders, explorationCandidate.address)
}
utils.LavaFormatTrace("returned providers",
utils.LogAttr("providers", strings.Join(returnedProviders, ",")),
utils.LogAttr("cu", cu),
Expand Down Expand Up @@ -247,25 +263,31 @@ func (po *ProviderOptimizer) shouldExplore(currentNumProvders, numProviders int)
}

func (po *ProviderOptimizer) isBetterProviderScore(latencyScore, latencyScoreCurrent, syncScore, syncScoreCurrent float64) bool {
var latencyWeight float64
switch po.strategy {
case STRATEGY_LATENCY:
latencyWeight = 0.7
case STRATEGY_SYNC_FRESHNESS:
latencyWeight = 0.2
case STRATEGY_PRIVACY:
// pick at random regardless of score
if rand.Intn(2) == 0 {
return true
}
return false
default:
latencyWeight = 0.6
}
if syncScoreCurrent == 0 {
return latencyScore > latencyScoreCurrent
}
return latencyScore*latencyWeight+syncScore*(1-latencyWeight) > latencyScoreCurrent*latencyWeight+syncScoreCurrent*(1-latencyWeight)
return po.calcProviderScore(latencyScore, syncScore) > po.calcProviderScore(latencyScoreCurrent, syncScoreCurrent)
}

func (po *ProviderOptimizer) calcProviderScore(latencyScore, syncScore float64) float64 {
var latencyWeight float64
switch po.strategy {
case STRATEGY_LATENCY:
latencyWeight = 0.7
case STRATEGY_SYNC_FRESHNESS:
latencyWeight = 0.2
default:
latencyWeight = 0.6
}
return latencyScore*latencyWeight + syncScore*(1-latencyWeight)
}

func (po *ProviderOptimizer) calculateSyncScore(syncScore score.ScoreStore) float64 {
Expand Down Expand Up @@ -469,7 +491,15 @@ func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency
// overwrite
wantedNumProvidersInConcurrency = 1
}
return &ProviderOptimizer{strategy: strategy, providersStorage: cache, averageBlockTime: averageBlockTIme, baseWorldLatency: baseWorldLatency, providerRelayStats: relayCache, wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency}
return &ProviderOptimizer{
strategy: strategy,
providersStorage: cache,
averageBlockTime: averageBlockTIme,
baseWorldLatency: baseWorldLatency,
providerRelayStats: relayCache,
wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency,
selectionWeighter: NewSelectionWeighter(),
}
}

// calculate the probability a random variable with a poisson distribution
Expand Down
115 changes: 115 additions & 0 deletions protocol/provideroptimizer/selection_tier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package provideroptimizer

import (
"github.com/lavanet/lava/v3/utils/rand"
)

type Entry struct {
Address string
Score float64
Part float64
}

type SelectionTier interface {
AddScore(entry string, score float64)
GetTier(tier int, numTiers int, minimumEntries int) []Entry
SelectTierRandomly(numTiers int, tierChances map[int]float64) int
}

type SelectionTierInst struct {
scores []Entry
}

func NewSelectionTier() SelectionTier {
return &SelectionTierInst{scores: []Entry{}}
}

func (st *SelectionTierInst) AddScore(entry string, score float64) {
// add the score to the scores list for the entry while keeping it sorted in ascending order
// this means that the highest score will be at the front of the list, tier 0 is highest scores
newEntry := Entry{Address: entry, Score: score, Part: 1}
// find the correct position to insert the new entry

for i, existingEntry := range st.scores {
if existingEntry.Address == entry {
// overwrite the existing entry
st.scores[i].Score = score
return
}
if score <= existingEntry.Score {
st.scores = append(st.scores[:i], append([]Entry{newEntry}, st.scores[i:]...)...)
return
}
}
// it's not smaller than any existing entry, so add it to the end
st.scores = append(st.scores, newEntry)
}

func (st *SelectionTierInst) SelectTierRandomly(numTiers int, tierChances map[int]float64) int {
// select a tier randomly based on the chances given
// if the chances are not given, select a tier randomly based on the number of tiers
if len(tierChances) == 0 {
return rand.Intn(numTiers)
}
// calculate the total chance
totalChance := 0.0
for _, chance := range tierChances {
totalChance += chance
}
chanceForDefaultTiers := (1 - totalChance) / float64(numTiers-len(tierChances))
// select a random number between 0 and 1
randChance := rand.Float64()
// find the tier that the random chance falls into
currentChance := 0.0
for i := 0; i < numTiers; i++ {
if chance, ok := tierChances[i]; ok {
currentChance += chance
} else {
currentChance += chanceForDefaultTiers
}
if randChance < currentChance {
return i
}
}
// default, should never happen
return 0
}

func (st *SelectionTierInst) GetTier(tier int, numTiers int, minimumEntries int) []Entry {
// get the tier of scores for the given tier and number of tiers
entriesLen := len(st.scores)
if entriesLen < minimumEntries || numTiers == 0 || tier >= numTiers {
return st.scores
}

start, end, fracStart, fracEnd := getPositionsForTier(tier, numTiers, entriesLen)
if end < minimumEntries {
return st.scores[:minimumEntries]
}
ret := st.scores[start:end]
if len(ret) >= minimumEntries {
// apply the relative parts to the first and last entries
ret[0].Part = 1 - fracStart
ret[len(ret)-1].Part = fracEnd
return ret
}
// bring in entries from better tiers if insufficient
// end is > minimumEntries, and end - start < minimumEntries
entriesToTake := minimumEntries - len(ret)
entriesToTakeStart := start - entriesToTake
ret = append(st.scores[entriesToTakeStart:start], ret...)
return ret
}

func getPositionsForTier(tier int, numTiers int, entriesLen int) (start int, end int, fracStart float64, fracEnd float64) {
rankStart := float64(tier) / float64(numTiers)
rankEnd := float64(tier+1) / float64(numTiers)
// Calculate the position based on the rank
startPositionF := (float64(entriesLen-1) * rankStart)
endPositionF := (float64(entriesLen-1) * rankEnd)

positionStart := int(startPositionF)
positionEnd := int(endPositionF) + 1

return positionStart, positionEnd, startPositionF - float64(positionStart), float64(positionEnd) - endPositionF
}
Loading
Loading