Skip to content

Commit

Permalink
refactor(zetaclient): move rpc latency logs to metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Dec 17, 2024
1 parent 6730285 commit 8b857e6
Show file tree
Hide file tree
Showing 22 changed files with 57 additions and 154 deletions.
27 changes: 4 additions & 23 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func NewObserver(
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
blockCacheSize int,
rpcAlertLatency int64,
ts *metrics.TelemetryServer,
database *db.DB,
logger Logger,
Expand All @@ -105,7 +104,6 @@ func NewObserver(
lastBlock: 0,
lastBlockScanned: 0,
lastTxScanned: "",
rpcAlertLatency: time.Duration(rpcAlertLatency) * time.Second,
ts: ts,
db: database,
blockCache: blockCache,
Expand Down Expand Up @@ -429,29 +427,12 @@ func (ob *Observer) PostVoteInbound(
return ballot, nil
}

// AlertOnRPCLatency prints an alert if the RPC latency exceeds the threshold.
// Returns true if the RPC latency is too high.
func (ob *Observer) AlertOnRPCLatency(latestBlockTime time.Time, defaultAlertLatency time.Duration) bool {
// ReportBlockLatency records the latency between the current time
// an the latest block time for a chain as a metric
func (ob *Observer) ReportBlockLatency(latestBlockTime time.Time) {

Check warning on line 432 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L432

Added line #L432 was not covered by tests
elapsedTime := time.Since(latestBlockTime)

alertLatency := ob.rpcAlertLatency
if alertLatency == 0 {
alertLatency = defaultAlertLatency
}

lf := map[string]any{
"rpc_latency_alert_ms": alertLatency.Milliseconds(),
"rpc_latency_real_ms": elapsedTime.Milliseconds(),
}

if elapsedTime > alertLatency {
ob.logger.Chain.Error().Fields(lf).Msg("RPC latency is too high, please check the node or explorer")
return true
}

ob.logger.Chain.Info().Fields(lf).Msg("RPC latency is OK")

return false
metrics.LatestBlockLatency.WithLabelValues(ob.chain.Name).Set(elapsedTime.Seconds())

Check warning on line 435 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L435

Added line #L435 was not covered by tests
}

// EnvVarLatestBlockByChain returns the environment variable for the last block by chain.
Expand Down
103 changes: 25 additions & 78 deletions zetaclient/chains/base/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand All @@ -25,9 +24,6 @@ import (
)

const (
// defaultAlertLatency is the default alert latency (in seconds) for unit tests
defaultAlertLatency = 60

// defaultConfirmationCount is the default confirmation count for unit tests
defaultConfirmationCount = 2
)
Expand All @@ -40,7 +36,7 @@ type testSuite struct {
}

// newTestSuite creates a new observer for testing
func newTestSuite(t *testing.T, chain chains.Chain, alertLatency int64) *testSuite {
func newTestSuite(t *testing.T, chain chains.Chain) *testSuite {
// constructor parameters
chainParams := *sample.ChainParams(chain.ChainId)
chainParams.ConfirmationCount = defaultConfirmationCount
Expand All @@ -57,7 +53,6 @@ func newTestSuite(t *testing.T, chain chains.Chain, alertLatency int64) *testSui
zetacoreClient,
tss,
base.DefaultBlockCacheSize,
alertLatency,
nil,
database,
logger,
Expand Down Expand Up @@ -127,7 +122,6 @@ func TestNewObserver(t *testing.T) {
tt.zetacoreClient,
tt.tss,
tt.blockCacheSize,
60,
nil,
database,
base.DefaultLogger(),
Expand All @@ -147,7 +141,7 @@ func TestNewObserver(t *testing.T) {
func TestStop(t *testing.T) {
t.Run("should be able to stop observer", func(t *testing.T) {
// create observer and initialize db
ob := newTestSuite(t, chains.Ethereum, defaultAlertLatency)
ob := newTestSuite(t, chains.Ethereum)

// stop observer
ob.Stop()
Expand All @@ -158,7 +152,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
chain := chains.Ethereum

t.Run("should be able to update last block", func(t *testing.T) {
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// update last block
newLastBlock := uint64(100)
Expand All @@ -167,7 +161,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update last block scanned", func(t *testing.T) {
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// update last block scanned
newLastBlockScanned := uint64(100)
Expand All @@ -176,7 +170,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update last tx scanned", func(t *testing.T) {
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// update last tx scanned
newLastTxScanned := sample.EthAddress().String()
Expand All @@ -185,7 +179,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to get logger", func(t *testing.T) {
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)
logger := ob.Logger()

// should be able to print log
Expand Down Expand Up @@ -233,7 +227,7 @@ func TestTSSAddressString(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := newTestSuite(t, tt.chain, defaultAlertLatency)
ob := newTestSuite(t, tt.chain)

// get TSS address
addr := ob.TSSAddressString()
Expand Down Expand Up @@ -286,7 +280,7 @@ func TestIsBlockConfirmed(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := newTestSuite(t, tt.chain, defaultAlertLatency)
ob := newTestSuite(t, tt.chain)
ob.Observer.WithLastBlock(tt.lastBlock)

// check if block is confirmed
Expand Down Expand Up @@ -318,7 +312,7 @@ func TestOutboundID(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := newTestSuite(t, tt.chain, defaultAlertLatency)
ob := newTestSuite(t, tt.chain)

// get outbound id
outboundID := ob.OutboundID(tt.nonce)
Expand All @@ -336,7 +330,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should be able to load last block scanned", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write 100 as last block scanned
err := ob.WriteLastBlockScannedToDB(100)
Expand All @@ -350,7 +344,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("latest block scanned should be 0 if not found in db", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// read last block scanned
err := ob.LoadLastBlockScanned(log.Logger)
Expand All @@ -360,7 +354,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should overwrite last block scanned if env var is set", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write 100 as last block scanned
ob.WriteLastBlockScannedToDB(100)
Expand All @@ -376,7 +370,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("last block scanned should remain 0 if env var is set to latest", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write 100 as last block scanned
ob.WriteLastBlockScannedToDB(100)
Expand All @@ -392,7 +386,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should return error on invalid env var", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// set invalid env var
os.Setenv(envvar, "invalid")
Expand All @@ -406,7 +400,7 @@ func TestLoadLastBlockScanned(t *testing.T) {
func TestSaveLastBlockScanned(t *testing.T) {
t.Run("should be able to save last block scanned", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chains.Ethereum, defaultAlertLatency)
ob := newTestSuite(t, chains.Ethereum)

// save 100 as last block scanned
err := ob.SaveLastBlockScanned(100)
Expand All @@ -426,7 +420,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) {
chain := chains.Ethereum
t.Run("should be able to write and read last block scanned to db", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// write last block scanned
err := ob.WriteLastBlockScannedToDB(100)
Expand All @@ -439,7 +433,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) {

t.Run("should return error when last block scanned not found in db", func(t *testing.T) {
// create empty db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

lastScannedBlock, err := ob.ReadLastBlockScannedFromDB()
require.Error(t, err)
Expand All @@ -453,7 +447,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("should be able to load last tx scanned", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write sample hash as last tx scanned
ob.WriteLastTxScannedToDB(lastTx)
Expand All @@ -465,7 +459,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("latest tx scanned should be empty if not found in db", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// read last tx scanned
ob.LoadLastTxScanned()
Expand All @@ -474,7 +468,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("should overwrite last tx scanned if env var is set", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// create db and write sample hash as last tx scanned
ob.WriteLastTxScannedToDB(lastTx)
Expand All @@ -493,7 +487,7 @@ func TestSaveLastTxScanned(t *testing.T) {
chain := chains.SolanaDevnet
t.Run("should be able to save last tx scanned", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// save random tx hash
lastSlot := uint64(100)
Expand All @@ -516,7 +510,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {
chain := chains.SolanaDevnet
t.Run("should be able to write and read last tx scanned to db", func(t *testing.T) {
// create observer and open db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

// write last tx scanned
lastTx := "5LuQMorgd11p8GWEw6pmyHCDtA26NUyeNFhLWPNk2oBoM9pkag1LzhwGSRos3j4TJLhKjswFhZkGtvSGdLDkmqsk"
Expand All @@ -530,7 +524,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {

t.Run("should return error when last tx scanned not found in db", func(t *testing.T) {
// create empty db
ob := newTestSuite(t, chain, defaultAlertLatency)
ob := newTestSuite(t, chain)

lastTxScanned, err := ob.ReadLastTxScannedFromDB()
require.Error(t, err)
Expand All @@ -541,7 +535,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {
func TestPostVoteInbound(t *testing.T) {
t.Run("should be able to post vote inbound", func(t *testing.T) {
// create observer
ob := newTestSuite(t, chains.Ethereum, defaultAlertLatency)
ob := newTestSuite(t, chains.Ethereum)

ob.zetacore.WithPostVoteInbound("", "sampleBallotIndex")

Expand All @@ -554,7 +548,7 @@ func TestPostVoteInbound(t *testing.T) {

t.Run("should not post vote if message basic validation fails", func(t *testing.T) {
// create observer
ob := newTestSuite(t, chains.Ethereum, defaultAlertLatency)
ob := newTestSuite(t, chains.Ethereum)

// create sample message with long Message
msg := sample.InboundVote(coin.CoinType_Gas, chains.Ethereum.ChainId, chains.ZetaChainMainnet.ChainId)
Expand All @@ -567,53 +561,6 @@ func TestPostVoteInbound(t *testing.T) {
})
}

func TestAlertOnRPCLatency(t *testing.T) {
now := time.Now()

tests := []struct {
name string
blockTime time.Time
alertLatency int64
alerted bool
}{
{
name: "should alert on high RPC latency",
blockTime: now.Add(-60 * time.Second),
alertLatency: 55,
alerted: true,
},
{
name: "should not alert on normal RPC latency",
blockTime: now.Add(-60 * time.Second),
alertLatency: 65,
alerted: false,
},
{
name: "should alert on higher RPC latency then default",
blockTime: now.Add(-65 * time.Second),
alertLatency: 0, // 0 means not set
alerted: true,
},
{
name: "should not alert on normal RPC latency when compared to default",
blockTime: now.Add(-55 * time.Second),
alertLatency: 0, // 0 means not set
alerted: false,
},
}

// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := newTestSuite(t, chains.Ethereum, tt.alertLatency)

alerted := ob.AlertOnRPCLatency(tt.blockTime, time.Duration(defaultAlertLatency)*time.Second)
require.Equal(t, tt.alerted, alerted)
})
}
}

func createDatabase(t *testing.T) *db.DB {
sqlDatabase, err := db.NewFromSqliteInMemory(true)
require.NoError(t, err)
Expand Down
2 changes: 0 additions & 2 deletions zetaclient/chains/bitcoin/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func NewObserver(
chainParams observertypes.ChainParams,
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
rpcAlertLatency int64,
database *db.DB,
logger base.Logger,
ts *metrics.TelemetryServer,
Expand All @@ -107,7 +106,6 @@ func NewObserver(
zetacoreClient,
tss,
btcBlocksPerDay,
rpcAlertLatency,
ts,
database,
logger,
Expand Down
2 changes: 0 additions & 2 deletions zetaclient/chains/bitcoin/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func Test_NewObserver(t *testing.T) {
tt.chainParams,
tt.coreClient,
tt.tss,
60,
database,
tt.logger,
tt.ts,
Expand Down Expand Up @@ -326,7 +325,6 @@ func newTestSuite(t *testing.T, chain chains.Chain) *testSuite {
chainParams,
zetacore,
nil,
60,
database,
base.Logger{Std: log, Compliance: log},
nil,
Expand Down
Loading

0 comments on commit 8b857e6

Please sign in to comment.