From b36f5486f7dd1c5fe4f1cc551e81df9fee48dc3e Mon Sep 17 00:00:00 2001 From: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> Date: Mon, 25 Nov 2024 13:57:55 +0100 Subject: [PATCH] feat: PRT - add provider optimizer metrics listener (#1785) * WIP * PRT - add new flags for optimizer metrics --- protocol/common/cobra_common.go | 1 + protocol/common/endpoints.go | 1 + protocol/metrics/consumer_metrics_manager.go | 47 ++++++++++++++++++- .../metrics/consumer_optimizer_qos_client.go | 33 +++++++++---- protocol/rpcconsumer/rpcconsumer.go | 16 +++++-- protocol/rpcconsumer/rpcconsumer_server.go | 6 +++ .../pre_setups/init_lava_only_with_node.sh | 2 +- 7 files changed, 89 insertions(+), 17 deletions(-) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 338b003f67..74c8211b37 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -41,6 +41,7 @@ const ( // optimizer qos server flags OptimizerQosServerAddressFlag = "optimizer-qos-server-address" // address of the optimizer qos server to send the qos reports + OptimizerQosListenFlag = "optimizer-qos-listen" // enable listening for qos reports on metrics endpoint OptimizerQosServerPushIntervalFlag = "optimizer-qos-push-interval" // interval to push the qos reports to the optimizer qos server OptimizerQosServerSamplingIntervalFlag = "optimizer-qos-sampling-interval" // interval to sample the qos reports // websocket flags diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index 03998cc2ff..2379512708 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -29,6 +29,7 @@ const ( REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers" USER_REQUEST_TYPE = "lava-user-request-type" STATEFUL_API_HEADER = "lava-stateful-api" + REQUESTED_BLOCK_HEADER_NAME = "lava-parsed-requested-block" LAVA_IDENTIFIED_NODE_ERROR_HEADER = "lava-identified-node-error" LAVAP_VERSION_HEADER_NAME = "Lavap-Version" LAVA_CONSUMER_PROCESS_GUID = "lava-consumer-process-guid" diff --git a/protocol/metrics/consumer_metrics_manager.go b/protocol/metrics/consumer_metrics_manager.go index b3ac3e910e..9a77678a4d 100644 --- a/protocol/metrics/consumer_metrics_manager.go +++ b/protocol/metrics/consumer_metrics_manager.go @@ -1,6 +1,7 @@ package metrics import ( + "encoding/json" "fmt" "net/http" "sync" @@ -64,11 +65,14 @@ type ConsumerMetricsManager struct { relayProcessingLatencyBeforeProvider *prometheus.GaugeVec relayProcessingLatencyAfterProvider *prometheus.GaugeVec averageProcessingLatency map[string]*LatencyTracker + consumerOptimizerQoSClient *ConsumerOptimizerQoSClient } type ConsumerMetricsManagerOptions struct { - NetworkAddress string - AddMethodsApiGauge bool + NetworkAddress string + AddMethodsApiGauge bool + EnableQoSListener bool + ConsumerOptimizerQoSClient *ConsumerOptimizerQoSClient } func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerMetricsManager { @@ -270,9 +274,24 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider, relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider, averageProcessingLatency: map[string]*LatencyTracker{}, + consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient, } http.Handle("/metrics", promhttp.Handler()) + http.HandleFunc("/provider_optimizer_metrics", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + reports := consumerMetricsManager.consumerOptimizerQoSClient.GetReportsToSend() + jsonData, err := json.Marshal(reports) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(jsonData) + }) overallHealthHandler := func(w http.ResponseWriter, r *http.Request) { statusCode := http.StatusOK @@ -545,3 +564,27 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain } pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc() } + +func (pme *ConsumerMetricsManager) handleOptimizerQoS(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var report OptimizerQoSReportToSend + if err := json.NewDecoder(r.Body).Decode(&report); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + // Process the received QoS report here + utils.LavaFormatDebug("Received QoS report", + utils.LogAttr("provider", report.ProviderAddress), + utils.LogAttr("chain_id", report.ChainId), + utils.LogAttr("sync_score", report.SyncScore), + utils.LogAttr("availability_score", report.AvailabilityScore), + utils.LogAttr("latency_score", report.LatencyScore), + ) + + w.WriteHeader(http.StatusOK) +} diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go index f204107e17..3e12e23dfc 100644 --- a/protocol/metrics/consumer_optimizer_qos_client.go +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -30,6 +30,7 @@ type ConsumerOptimizerQoSClient struct { chainIdToProviderToEpochToStake map[string]map[string]map[uint64]int64 // third key is epoch currentEpoch atomic.Uint64 lock sync.RWMutex + reportsToSend []OptimizerQoSReportToSend } type OptimizerQoSReport struct { @@ -41,7 +42,7 @@ type OptimizerQoSReport struct { EntryIndex int } -type optimizerQoSReportToSend struct { +type OptimizerQoSReportToSend struct { Timestamp time.Time `json:"timestamp"` SyncScore float64 `json:"sync_score"` AvailabilityScore float64 `json:"availability_score"` @@ -56,7 +57,7 @@ type optimizerQoSReportToSend struct { EntryIndex int `json:"entry_index"` } -func (oqosr optimizerQoSReportToSend) String() string { +func (oqosr OptimizerQoSReportToSend) String() string { bytes, err := json.Marshal(oqosr) if err != nil { return "" @@ -74,7 +75,6 @@ func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Dura utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err) hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns } - return &ConsumerOptimizerQoSClient{ consumerOrigin: hostname, queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...), @@ -126,10 +126,9 @@ func (coqc *ConsumerOptimizerQoSClient) calculateNodeErrorRate(chainId, provider return 0 } -func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) { +func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) OptimizerQoSReportToSend { // must be called under read lock - - optimizerQoSReportToSend := optimizerQoSReportToSend{ + optimizerQoSReportToSend := OptimizerQoSReportToSend{ Timestamp: time.Now(), ConsumerOrigin: coqc.consumerOrigin, SyncScore: report.SyncScore, @@ -145,9 +144,10 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz } coqc.queueSender.appendQueue(optimizerQoSReportToSend) + return optimizerQoSReportToSend } -func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() { +func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() []OptimizerQoSReportToSend { coqc.lock.RLock() // we only read from the maps here defer coqc.lock.RUnlock() @@ -156,7 +156,7 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() { requestedBlock := spectypes.LATEST_BLOCK currentEpoch := coqc.currentEpoch.Load() - + reportsToSend := []OptimizerQoSReportToSend{} for chainId, optimizer := range coqc.optimizers { providersMap, ok := coqc.chainIdToProviderToEpochToStake[chainId] if !ok { @@ -165,9 +165,22 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() { reports := optimizer.CalculateQoSScoresForMetrics(maps.Keys(providersMap), ignoredProviders, cu, requestedBlock) for _, report := range reports { - coqc.appendOptimizerQoSReport(report, chainId, currentEpoch) + reportsToSend = append(reportsToSend, coqc.appendOptimizerQoSReport(report, chainId, currentEpoch)) } } + return reportsToSend +} + +func (coqc *ConsumerOptimizerQoSClient) SetReportsToSend(reports []OptimizerQoSReportToSend) { + coqc.lock.Lock() + defer coqc.lock.Unlock() + coqc.reportsToSend = reports +} + +func (coqc *ConsumerOptimizerQoSClient) GetReportsToSend() []OptimizerQoSReportToSend { + coqc.lock.RLock() + defer coqc.lock.RUnlock() + return coqc.reportsToSend } func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx context.Context, samplingInterval time.Duration) { @@ -183,7 +196,7 @@ func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx utils.LavaFormatTrace("ConsumerOptimizerQoSClient context done") return case <-time.After(samplingInterval): - coqc.getReportsFromOptimizers() + coqc.SetReportsToSend(coqc.getReportsFromOptimizers()) } } }() diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index bfd554925e..99dcb15a01 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -106,6 +106,7 @@ type AnalyticsServerAddresses struct { RelayServerAddress string ReportsAddressFlag string OptimizerQoSAddress string + OptimizerQoSListen bool } type RPCConsumer struct { consumerStateTracker ConsumerStateTrackerInf @@ -133,14 +134,19 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt } options.refererData.ReferrerClient = metrics.NewConsumerReferrerClient(options.refererData.Address) consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddresses.ReportsAddressFlag) - consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, AddMethodsApiGauge: options.analyticsServerAddresses.AddApiMethodCallsMetrics}) // start up prometheus metrics - consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting + + consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient - if options.analyticsServerAddresses.OptimizerQoSAddress != "" { + if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSListen { consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client } - + consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{ + NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, + AddMethodsApiGauge: options.analyticsServerAddresses.AddApiMethodCallsMetrics, + EnableQoSListener: options.analyticsServerAddresses.OptimizerQoSListen, + ConsumerOptimizerQoSClient: consumerOptimizerQoSClient, + }) // start up prometheus metrics rpcConsumerMetrics, err := metrics.NewRPCConsumerLogs(consumerMetricsManager, consumerUsageServeManager, consumerOptimizerQoSClient) if err != nil { utils.LavaFormatFatal("failed creating RPCConsumer logs", err) @@ -557,6 +563,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 RelayServerAddress: viper.GetString(metrics.RelayServerFlagName), ReportsAddressFlag: viper.GetString(reportsSendBEAddress), OptimizerQoSAddress: viper.GetString(common.OptimizerQosServerAddressFlag), + OptimizerQoSListen: viper.GetBool(common.OptimizerQosListenFlag), } var refererData *chainlib.RefererData @@ -646,6 +653,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().IntVar(&provideroptimizer.OptimizerNumTiers, common.SetProviderOptimizerNumberOfTiersToCreate, 4, "set the number of groups to create, default is 4") // optimizer qos reports cmdRPCConsumer.Flags().String(common.OptimizerQosServerAddressFlag, "", "address to send optimizer qos reports to") + cmdRPCConsumer.Flags().Bool(common.OptimizerQosListenFlag, false, "enable listening for optimizer qos reports on metrics endpoint i.e GET -> localhost:7779/provider_optimizer_metrics") cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerPushInterval, common.OptimizerQosServerPushIntervalFlag, time.Minute*5, "interval to push optimizer qos reports") cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerSamplingInterval, common.OptimizerQosServerSamplingIntervalFlag, time.Second*1, "interval to sample optimizer qos reports") cmdRPCConsumer.Flags().IntVar(&chainlib.WebSocketRateLimit, common.RateLimitWebSocketFlag, chainlib.WebSocketRateLimit, "rate limit (per second) websocket requests per user connection, default is unlimited") diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 427d615102..30b63f1e25 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -1485,6 +1485,12 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, directiveHeaders := protocolMessage.GetDirectiveHeaders() _, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY] if debugRelays { + metadataReply = append(metadataReply, + pairingtypes.Metadata{ + Name: common.REQUESTED_BLOCK_HEADER_NAME, + Value: strconv.FormatInt(protocolMessage.RelayPrivateData().GetRequestBlock(), 10), + }) + routerKey := lavasession.NewRouterKeyFromExtensions(protocolMessage.GetExtensions()) erroredProviders := relayProcessor.GetUsedProviders().GetErroredProviders(routerKey) if len(erroredProviders) > 0 { diff --git a/scripts/pre_setups/init_lava_only_with_node.sh b/scripts/pre_setups/init_lava_only_with_node.sh index d99ddc4094..c47abc0ada 100755 --- a/scripts/pre_setups/init_lava_only_with_node.sh +++ b/scripts/pre_setups/init_lava_only_with_node.sh @@ -57,7 +57,7 @@ wait_next_block screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 +$EXTRA_PORTAL_FLAGS --geolocation 1 --optimizer-qos-listen --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 echo "--- setting up screens done ---" screen -ls \ No newline at end of file