Skip to content

Commit

Permalink
apply optimizer refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
oren-lava committed Dec 9, 2024
1 parent ac2bccf commit 82a5c31
Show file tree
Hide file tree
Showing 21 changed files with 1,137 additions and 2,675 deletions.
3 changes: 1 addition & 2 deletions protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,10 +722,9 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

func CreateConsumerSessionManager(chainID, apiInterface, consumerPublicAddress string) *lavasession.ConsumerSessionManager {
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return lavasession.NewConsumerSessionManager(
&lavasession.RPCEndpoint{NetworkAddress: "stub", ChainID: chainID, ApiInterface: apiInterface, TLSEnabled: false, HealthCheckPath: "/", Geolocation: 0},
provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"),
provideroptimizer.NewProviderOptimizer(provideroptimizer.StrategyBalanced, 0, 1, nil, "dontcare"),
nil, nil, consumerPublicAddress,
lavasession.NewActiveSubscriptionProvidersStorage(),
)
Expand Down
3 changes: 1 addition & 2 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc
consumerStateTracker := &mockConsumerStateTracker{}
finalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID)
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
baseLatency := common.AverageWorldLatency / 2
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil, "dontcare")
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.StrategyBalanced, averageBlockTime, 2, nil, "dontcare")
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage())
consumerSessionManager.UpdateAllProviders(rpcConsumerOptions.epoch, rpcConsumerOptions.pairingList)

Expand Down
10 changes: 5 additions & 5 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,15 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
sessionInfo.QoSSummeryResult = consumerSession.getQosComputedResultOrZero()
sessions[providerAddress] = sessionInfo

qosReport, rawQosReport := csm.providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress)
qosReport, _ := csm.providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress)
if csm.rpcEndpoint.Geolocation != uint64(endpoint.endpoint.Geolocation) {
// rawQosReport is used only when building the relay payment message to be used to update
// the provider's reputation on-chain. If the consumer and provider don't share geolocation
// (consumer geo: csm.rpcEndpoint.Geolocation, provider geo: endpoint.endpoint.Geolocation)
// we don't want to update the reputation by it, so we null the rawQosReport
rawQosReport = nil
qosReport = nil
}
consumerSession.SetUsageForSession(cuNeededForSession, qosReport, rawQosReport, usedProviders, routerKey)
consumerSession.SetUsageForSession(cuNeededForSession, qosReport, usedProviders, routerKey)
// We successfully added provider, we should ignore it if we need to fetch new
tempIgnoredProviders.providers[providerAddress] = struct{}{}
if len(sessions) == wantedSession {
Expand Down Expand Up @@ -640,7 +640,7 @@ func (csm *ConsumerSessionManager) getValidProviderAddresses(ignoredProvidersLis
}
}
var providers []string
if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST {
if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.StrategyCost {
providers = csm.getTopTenProvidersForStatefulCalls(validAddresses, ignoredProvidersList)
} else {
providers, _ = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock)
Expand Down Expand Up @@ -1049,7 +1049,7 @@ func (csm *ConsumerSessionManager) OnSessionDone(
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
if !isHangingApi {
// append relay data only for non hanging apis
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock))
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, specComputeUnits, uint64(latestServicedBlock))
}

csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis
Expand Down
3 changes: 1 addition & 2 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ func TestEndpointSortingFlow(t *testing.T) {

func CreateConsumerSessionManager() *ConsumerSessionManager {
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage())
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.StrategyBalanced, 0, 1, nil, "dontcare"), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage())
}

func TestMain(m *testing.M) {
Expand Down
4 changes: 2 additions & 2 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ type ConsumerSessionsMap map[string]*SessionInfo
type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64)
AppendRelayData(providerAddress string, latency time.Duration, cu, syncBlock uint64)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, time.Time)
Strategy() provideroptimizer.Strategy
UpdateWeights(map[string]int64, uint64)
}
Expand Down
3 changes: 1 addition & 2 deletions protocol/lavasession/single_consumer_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,12 @@ func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Dura
}
}

func (scs *SingleConsumerSession) SetUsageForSession(cuNeededForSession uint64, qoSExcellenceReport *pairingtypes.QualityOfServiceReport, rawQoSExcellenceReport *pairingtypes.QualityOfServiceReport, usedProviders UsedProvidersInf, routerKey RouterKey) error {
func (scs *SingleConsumerSession) SetUsageForSession(cuNeededForSession uint64, qoSExcellenceReport *pairingtypes.QualityOfServiceReport, usedProviders UsedProvidersInf, routerKey RouterKey) error {
scs.LatestRelayCu = cuNeededForSession // set latestRelayCu
scs.RelayNum += RelayNumberIncrement // increase relayNum
if scs.RelayNum > 1 {
// we only set excellence for sessions with more than one successful relays, this guarantees data within the epoch exists
scs.QoSInfo.LastExcellenceQoSReport = qoSExcellenceReport
scs.QoSInfo.LastExcellenceQoSReportRaw = rawQoSExcellenceReport
}
scs.usedProviders = usedProviders
scs.routerKey = routerKey
Expand Down
Loading

0 comments on commit 82a5c31

Please sign in to comment.