Skip to content

Commit

Permalink
Numerus changes
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes committed Oct 22, 2024
1 parent e2b9120 commit 9563fd1
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 239 deletions.
44 changes: 32 additions & 12 deletions protocol/metrics/metrics_provider_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type ProviderMetricsManager struct {
relaysMonitors map[string]*RelaysMonitor
relaysMonitorsLock sync.RWMutex
frozenStatusMetric *prometheus.GaugeVec
jailedStatusMetric *prometheus.GaugeVec
jailStatusMetric *prometheus.GaugeVec
jailedCountMetric *prometheus.GaugeVec
}

func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Expand Down Expand Up @@ -113,29 +114,38 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Name: "lava_provider_fetch_block_success",
Help: "The total number of get specific block queries that succeeded by chainfetcher",
}, []string{"spec"})

virtualEpochMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "virtual_epoch",
Help: "The current virtual epoch measured",
}, []string{"spec"})

endpointsHealthChecksOkMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "lava_provider_overall_health",
Help: "At least one endpoint is healthy",
})
endpointsHealthChecksOkMetric.Set(1)

frozenStatusMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_frozen_status",
Help: "Frozen: 1, Healthy: 0",
}, []string{"chainID", "address"})
Help: "Frozen: 1, Not Frozen: 0",
}, []string{"chainID"})

jailedStatusMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_jailed_status",
jailStatusMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_jail_status",
Help: "Jailed: 1, Not Jailed: 0",
}, []string{"chainID"})

jailedCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_jailed_count",
Help: "The amount of times the provider was jailed in the last 24 hours",
}, []string{"chainID", "address"})
}, []string{"chainID"})

protocolVersionMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_protocol_version",
Help: "The current running lavap version for the process. major := version / 1000000, minor := (version / 1000) % 1000 patch := version % 1000",
}, []string{"version"})

// Register the metrics with the Prometheus registry.
prometheus.MustRegister(totalCUServicedMetric)
prometheus.MustRegister(totalCUPaidMetric)
Expand All @@ -153,7 +163,8 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
prometheus.MustRegister(endpointsHealthChecksOkMetric)
prometheus.MustRegister(protocolVersionMetric)
prometheus.MustRegister(frozenStatusMetric)
prometheus.MustRegister(jailedStatusMetric)
prometheus.MustRegister(jailStatusMetric)
prometheus.MustRegister(jailedCountMetric)

providerMetricsManager := &ProviderMetricsManager{
providerMetrics: map[string]*ProviderMetrics{},
Expand All @@ -175,7 +186,8 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
protocolVersionMetric: protocolVersionMetric,
relaysMonitors: map[string]*RelaysMonitor{},
frozenStatusMetric: frozenStatusMetric,
jailedStatusMetric: jailedStatusMetric,
jailStatusMetric: jailStatusMetric,
jailedCountMetric: jailedCountMetric,
}

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -366,18 +378,26 @@ func (pme *ProviderMetricsManager) RegisterRelaysMonitor(chainID, apiInterface s
pme.relaysMonitors[chainID+apiInterface] = relaysMonitor
}

func (pme *ProviderMetricsManager) SetFrozenStatus(frozenStatus float64, chain string, address string) {
func (pme *ProviderMetricsManager) SetFrozenStatus(chain string, frozen bool) {
if pme == nil {
return
}

pme.frozenStatusMetric.WithLabelValues(chain).Set(utils.Btof(frozen))
}

func (pme *ProviderMetricsManager) SetJailStatus(chain string, jailed bool) {
if pme == nil {
return
}

pme.frozenStatusMetric.WithLabelValues(chain, address).Set(frozenStatus)
pme.jailStatusMetric.WithLabelValues(chain).Set(utils.Btof(jailed))
}

func (pme *ProviderMetricsManager) SetJailedStatus(jailedCounter uint64, chain string, address string) {
func (pme *ProviderMetricsManager) SetJailedCount(chain string, jailedCount uint64) {
if pme == nil {
return
}

pme.jailedStatusMetric.WithLabelValues(chain, address).Set(float64(jailedCounter))
pme.jailedCountMetric.WithLabelValues(chain).Set(float64(jailedCount))
}
22 changes: 10 additions & 12 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
rpcp.rewardServer = rewardserver.NewRewardServer(providerStateTracker, rpcp.providerMetricsManager, rewardDB, options.rewardStoragePath, options.rewardsSnapshotThreshold, options.rewardsSnapshotTimeoutSec, rpcp)
rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, rpcp.rewardServer)
rpcp.providerStateTracker.RegisterPaymentUpdatableForPayments(ctx, rpcp.rewardServer)
rpcp.createAndRegisterFreezeUpdatersByOptions(options, ctx)
}

keyName, err := sigs.GetKeyName(options.clientCtx)
if err != nil {
utils.LavaFormatFatal("failed getting key name from clientCtx", err)
Expand All @@ -211,8 +211,13 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
if err != nil {
utils.LavaFormatFatal("failed unmarshaling public address", err, utils.Attribute{Key: "keyName", Value: keyName}, utils.Attribute{Key: "pubkey", Value: pubKey.Address()})
}

utils.LavaFormatInfo("RPCProvider pubkey: " + rpcp.addr.String())

rpcp.createAndRegisterFreezeUpdatersByOptions(ctx, options.clientCtx, rpcp.addr.String())

utils.LavaFormatInfo("RPCProvider setting up endpoints", utils.Attribute{Key: "count", Value: strconv.Itoa(len(options.rpcProviderEndpoints))})

blockMemorySize, err := rpcp.providerStateTracker.GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx) // get the number of blocks to keep in PSM.
if err != nil {
utils.LavaFormatFatal("Failed fetching GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment in RPCProvider Start", err)
Expand Down Expand Up @@ -272,17 +277,10 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
return nil
}

func (rpcp *RPCProvider) createAndRegisterFreezeUpdatersByOptions(options *rpcProviderStartOptions, ctx context.Context) {
providerFreezeUpdaterByChainId := make(map[string]*updaters.ProviderFreezeUpdater)
for _, rpcProviderEndpoint := range options.rpcProviderEndpoints {
_, freezeUpdaterExists := providerFreezeUpdaterByChainId[rpcProviderEndpoint.ChainID]
if !freezeUpdaterExists {
stateQuery := updaters.NewProviderStateQuery(ctx, options.clientCtx)
freezeUpdater := updaters.NewProviderFreezeUpdater(stateQuery.PairingQueryClient, rpcProviderEndpoint.ChainID, options.clientCtx.FromAddress.String(), rpcp.providerMetricsManager)
rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, freezeUpdater)
providerFreezeUpdaterByChainId[rpcProviderEndpoint.ChainID] = freezeUpdater
}
}
func (rpcp *RPCProvider) createAndRegisterFreezeUpdatersByOptions(ctx context.Context, clientCtx client.Context, publicAddress string) {
queryClient := pairingtypes.NewQueryClient(clientCtx)
freezeJailUpdater := updaters.NewProviderFreezeJailUpdater(queryClient, publicAddress, rpcp.providerMetricsManager)
rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, freezeJailUpdater)
}

func getActiveEndpoints(rpcProviderEndpoints []*lavasession.RPCProviderEndpoint, disabledEndpointsList []*lavasession.RPCProviderEndpoint) []*lavasession.RPCProviderEndpoint {
Expand Down
71 changes: 71 additions & 0 deletions protocol/statetracker/updaters/provider_freeze_jail_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package updaters

import (
"context"
"time"

"github.com/lavanet/lava/v4/utils"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
"google.golang.org/grpc"
)

const (
CallbackKeyForFreezeUpdate = "freeze-update"
)

type ProviderPairingStatusStateQueryInf interface {
Provider(ctx context.Context, in *pairingtypes.QueryProviderRequest, opts ...grpc.CallOption) (*pairingtypes.QueryProviderResponse, error)
}

type ProviderMetricsManagerInf interface {
SetFrozenStatus(string, bool)
SetJailStatus(string, bool)
SetJailedCount(string, uint64)
}

type FrozenStatus uint64

const (
AVAILABLE FrozenStatus = iota
FROZEN
)

type ProviderFreezeJailUpdater struct {
pairingQueryClient ProviderPairingStatusStateQueryInf
metricsManager ProviderMetricsManagerInf
publicAddress string
}

func NewProviderFreezeJailUpdater(
pairingQueryClient ProviderPairingStatusStateQueryInf,
publicAddress string,
metricsManager ProviderMetricsManagerInf,
) *ProviderFreezeJailUpdater {
return &ProviderFreezeJailUpdater{
pairingQueryClient: pairingQueryClient,
publicAddress: publicAddress,
metricsManager: metricsManager,
}
}

func (pfu *ProviderFreezeJailUpdater) UpdateEpoch(epoch uint64) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
response, err := pfu.pairingQueryClient.Provider(ctx, &pairingtypes.QueryProviderRequest{Address: pfu.publicAddress})
cancel()

if err != nil {
utils.LavaFormatError("Failed querying pairing client for provider", err)
return
}

for _, provider := range response.StakeEntries {
if provider.Address != pfu.publicAddress || !provider.IsAddressVaultOrProvider(provider.Address) {
// should never happen, but just in case
continue
}

pfu.metricsManager.SetJailedCount(provider.Chain, provider.Jails)
pfu.metricsManager.SetJailStatus(provider.Chain, provider.IsJailed(time.Now().UTC().Unix()))
pfu.metricsManager.SetFrozenStatus(provider.Chain, provider.IsFrozen() || provider.StakeAppliedBlock > epoch)
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9563fd1

Please sign in to comment.