Skip to content

Commit

Permalink
Add the consumer geolocation to the QoS server report
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes committed Dec 9, 2024
1 parent 4631607 commit 7bfb314
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
6 changes: 5 additions & 1 deletion protocol/metrics/consumer_optimizer_qos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ConsumerOptimizerQoSClient struct {
currentEpoch atomic.Uint64
lock sync.RWMutex
reportsToSend []OptimizerQoSReportToSend
geoLocation uint64
}

type OptimizerQoSReport struct {
Expand All @@ -57,6 +58,7 @@ type OptimizerQoSReportToSend struct {
Epoch uint64 `json:"epoch"`
ProviderStake int64 `json:"provider_stake"`
EntryIndex int `json:"entry_index"`
GeoLocation uint64 `json:"geo_location"`
}

func (oqosr OptimizerQoSReportToSend) String() string {
Expand All @@ -71,7 +73,7 @@ type OptimizerInf interface {
CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport
}

func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient {
func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, geoLocation uint64, interval ...time.Duration) *ConsumerOptimizerQoSClient {
hostname, err := os.Hostname()
if err != nil {
utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err)
Expand All @@ -85,6 +87,7 @@ func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, inte
chainIdToProviderToRelaysCount: map[string]map[string]uint64{},
chainIdToProviderToNodeErrorsCount: map[string]map[string]uint64{},
chainIdToProviderToEpochToStake: map[string]map[string]map[uint64]int64{},
geoLocation: geoLocation,
}
}

Expand Down Expand Up @@ -145,6 +148,7 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz
Epoch: epoch,
NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress),
ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch),
GeoLocation: coqc.geoLocation,
}

coqc.queueSender.appendQueue(optimizerQoSReportToSend)
Expand Down
2 changes: 1 addition & 1 deletion protocol/provideroptimizer/provider_optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ func TestProviderOptimizerWithOptimizerQoSClient(t *testing.T) {

chainId := "dontcare"

consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient("lava@test", mockHttpServer.URL, 1*time.Second)
consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient("lava@test", mockHttpServer.URL, 1, 1*time.Second)
consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(context.Background(), 900*time.Millisecond)

providerOptimizer := NewProviderOptimizer(STRATEGY_BALANCED, TEST_AVERAGE_BLOCK_TIME, TEST_BASE_WORLD_LATENCY, 10, consumerOptimizerQoSClient, chainId)
Expand Down
6 changes: 4 additions & 2 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type rpcConsumerStartOptions struct {
stateShare bool
refererData *chainlib.RefererData
staticProvidersList []*lavasession.RPCProviderEndpoint // define static providers as backup to lava providers
geoLocation uint64
}

func getConsumerAddressAndKeys(clientCtx client.Context) (sdk.AccAddress, *secp256k1.PrivateKey, error) {
Expand Down Expand Up @@ -174,8 +175,8 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting
var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient
if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSListen {
consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(consumerAddr.String(), options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client
consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client
consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(consumerAddr.String(), options.analyticsServerAddresses.OptimizerQoSAddress, options.geoLocation, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client
consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval)
}
consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{
NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress,
Expand Down Expand Up @@ -716,6 +717,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
rpcConsumerSharedState,
refererData,
staticProviderEndpoints,
geolocation,
})
return err
},
Expand Down

0 comments on commit 7bfb314

Please sign in to comment.