Skip to content

Commit

Permalink
move rpcAlertLatency argument out of rpc methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed Aug 30, 2024
1 parent 3d55b51 commit 826da7b
Show file tree
Hide file tree
Showing 14 changed files with 187 additions and 165 deletions.
26 changes: 21 additions & 5 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,6 @@ func (ob *Observer) WithLastTxScanned(txHash string) *Observer {
return ob
}

// RPCAlertLatency returns the RPC alert latency for the observer.
func (ob *Observer) RPCAlertLatency() time.Duration {
return ob.rpcAlertLatency
}

// BlockCache returns the block cache for the observer.
func (ob *Observer) BlockCache() *lru.Cache {
return ob.blockCache
Expand Down Expand Up @@ -463,6 +458,27 @@ func (ob *Observer) PostVoteInbound(
return ballot, err
}

// AlertOnRPCLatency prints an alert if the RPC latency exceeds the threshold.
// Returns true if the RPC latency is too high.
func (ob *Observer) AlertOnRPCLatency(latestBlockTime time.Time, defaultAlertLatency time.Duration) bool {
// use configured alert latency if set
alertLatency := defaultAlertLatency
if ob.rpcAlertLatency > 0 {
alertLatency = ob.rpcAlertLatency
}

// latest block should not be too old
elapsedTime := time.Since(latestBlockTime)
if elapsedTime > alertLatency {
ob.logger.Chain.Error().
Msgf("RPC is stale: latest block is %.0f seconds old, RPC down or chain stuck (check explorer)?", elapsedTime.Seconds())
return true
}

ob.logger.Chain.Info().Msgf("RPC is OK: latest block is %.0f seconds old", elapsedTime.Seconds())
return false
}

// EnvVarLatestBlockByChain returns the environment variable for the last block by chain.
func EnvVarLatestBlockByChain(chain chains.Chain) string {
return fmt.Sprintf("CHAIN_%d_SCAN_FROM_BLOCK", chain.ChainId)
Expand Down
120 changes: 83 additions & 37 deletions zetaclient/chains/base/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"testing"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/rs/zerolog"
Expand All @@ -23,8 +24,13 @@ import (
"github.com/zeta-chain/zetacore/zetaclient/testutils/mocks"
)

const (
// defaultAlertLatency is the default alert latency for unit tests
defaultAlertLatency = 60 * time.Second
)

// createObserver creates a new observer for testing
func createObserver(t *testing.T, chain chains.Chain) *base.Observer {
func createObserver(t *testing.T, chain chains.Chain, alertLatency time.Duration) *base.Observer {
// constructor parameters
chainParams := *sample.ChainParams(chain.ChainId)
zetacoreClient := mocks.NewZetacoreClient(t)
Expand All @@ -41,7 +47,7 @@ func createObserver(t *testing.T, chain chains.Chain) *base.Observer {
tss,
base.DefaultBlockCacheSize,
base.DefaultHeaderCacheSize,
60,
alertLatency,
nil,
database,
logger,
Expand Down Expand Up @@ -143,7 +149,7 @@ func TestNewObserver(t *testing.T) {
func TestStop(t *testing.T) {
t.Run("should be able to stop observer", func(t *testing.T) {
// create observer and initialize db
ob := createObserver(t, chains.Ethereum)
ob := createObserver(t, chains.Ethereum, defaultAlertLatency)

// stop observer
ob.Stop()
Expand All @@ -154,7 +160,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
chain := chains.Ethereum

t.Run("should be able to update chain", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update chain
newChain := chains.BscMainnet
Expand All @@ -163,7 +169,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update chain params", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update chain params
newChainParams := *sample.ChainParams(chains.BscMainnet.ChainId)
Expand All @@ -172,7 +178,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update zetacore client", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update zetacore client
newZetacoreClient := mocks.NewZetacoreClient(t)
Expand All @@ -181,7 +187,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update tss", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update tss
newTSS := mocks.NewTSSAthens3()
Expand All @@ -190,7 +196,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update last block", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update last block
newLastBlock := uint64(100)
Expand All @@ -199,7 +205,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update last block scanned", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update last block scanned
newLastBlockScanned := uint64(100)
Expand All @@ -208,23 +214,16 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update last tx scanned", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update last tx scanned
newLastTxScanned := sample.EthAddress().String()
ob = ob.WithLastTxScanned(newLastTxScanned)
require.Equal(t, newLastTxScanned, ob.LastTxScanned())
})

t.Run("should be able to get rpc alert latency", func(t *testing.T) {
ob := createObserver(t, chain)

// get rpc alert latency
require.EqualValues(t, 60, ob.RPCAlertLatency())
})

t.Run("should be able to replace block cache", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update block cache
newBlockCache, err := lru.New(200)
Expand All @@ -235,7 +234,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to replace header cache", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update headers cache
newHeadersCache, err := lru.New(200)
Expand All @@ -246,7 +245,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to update telemetry server", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// update telemetry server
newServer := metrics.NewTelemetryServer()
Expand All @@ -255,7 +254,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
})

t.Run("should be able to get logger", func(t *testing.T) {
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)
logger := ob.Logger()

// should be able to print log
Expand Down Expand Up @@ -293,7 +292,7 @@ func TestOutboundID(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := createObserver(t, tt.chain)
ob := createObserver(t, tt.chain, defaultAlertLatency)
ob = ob.WithTSS(tt.tss)

// get outbound id
Expand All @@ -315,7 +314,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should be able to load last block scanned", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// create db and write 100 as last block scanned
err := ob.WriteLastBlockScannedToDB(100)
Expand All @@ -329,7 +328,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("latest block scanned should be 0 if not found in db", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// read last block scanned
err := ob.LoadLastBlockScanned(log.Logger)
Expand All @@ -339,7 +338,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should overwrite last block scanned if env var is set", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// create db and write 100 as last block scanned
ob.WriteLastBlockScannedToDB(100)
Expand All @@ -355,7 +354,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("last block scanned should remain 0 if env var is set to latest", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// create db and write 100 as last block scanned
ob.WriteLastBlockScannedToDB(100)
Expand All @@ -371,7 +370,7 @@ func TestLoadLastBlockScanned(t *testing.T) {

t.Run("should return error on invalid env var", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// set invalid env var
os.Setenv(envvar, "invalid")
Expand All @@ -385,7 +384,7 @@ func TestLoadLastBlockScanned(t *testing.T) {
func TestSaveLastBlockScanned(t *testing.T) {
t.Run("should be able to save last block scanned", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chains.Ethereum)
ob := createObserver(t, chains.Ethereum, defaultAlertLatency)

// save 100 as last block scanned
err := ob.SaveLastBlockScanned(100)
Expand All @@ -405,7 +404,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) {
chain := chains.Ethereum
t.Run("should be able to write and read last block scanned to db", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// write last block scanned
err := ob.WriteLastBlockScannedToDB(100)
Expand All @@ -418,7 +417,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) {

t.Run("should return error when last block scanned not found in db", func(t *testing.T) {
// create empty db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

lastScannedBlock, err := ob.ReadLastBlockScannedFromDB()
require.Error(t, err)
Expand All @@ -432,7 +431,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("should be able to load last tx scanned", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// create db and write sample hash as last tx scanned
ob.WriteLastTxScannedToDB(lastTx)
Expand All @@ -444,7 +443,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("latest tx scanned should be empty if not found in db", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// read last tx scanned
ob.LoadLastTxScanned()
Expand All @@ -453,7 +452,7 @@ func TestLoadLastTxScanned(t *testing.T) {

t.Run("should overwrite last tx scanned if env var is set", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// create db and write sample hash as last tx scanned
ob.WriteLastTxScannedToDB(lastTx)
Expand All @@ -472,7 +471,7 @@ func TestSaveLastTxScanned(t *testing.T) {
chain := chains.SolanaDevnet
t.Run("should be able to save last tx scanned", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// save random tx hash
lastSlot := uint64(100)
Expand All @@ -495,7 +494,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {
chain := chains.SolanaDevnet
t.Run("should be able to write and read last tx scanned to db", func(t *testing.T) {
// create observer and open db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

// write last tx scanned
lastTx := "5LuQMorgd11p8GWEw6pmyHCDtA26NUyeNFhLWPNk2oBoM9pkag1LzhwGSRos3j4TJLhKjswFhZkGtvSGdLDkmqsk"
Expand All @@ -509,7 +508,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {

t.Run("should return error when last tx scanned not found in db", func(t *testing.T) {
// create empty db
ob := createObserver(t, chain)
ob := createObserver(t, chain, defaultAlertLatency)

lastTxScanned, err := ob.ReadLastTxScannedFromDB()
require.Error(t, err)
Expand All @@ -520,7 +519,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) {
func TestPostVoteInbound(t *testing.T) {
t.Run("should be able to post vote inbound", func(t *testing.T) {
// create observer
ob := createObserver(t, chains.Ethereum)
ob := createObserver(t, chains.Ethereum, defaultAlertLatency)

// create mock zetacore client
zetacoreClient := mocks.NewZetacoreClient(t)
Expand All @@ -535,6 +534,53 @@ func TestPostVoteInbound(t *testing.T) {
})
}

func TestAlertOnRPCLatency(t *testing.T) {
now := time.Now()

tests := []struct {
name string
blockTime time.Time
alertLatency time.Duration
alerted bool
}{
{
name: "should alert on high RPC latency",
blockTime: now.Add(-60 * time.Second),
alertLatency: 55,
alerted: true,
},
{
name: "should not alert on normal RPC latency",
blockTime: now.Add(-60 * time.Second),
alertLatency: 65,
alerted: false,
},
{
name: "should alert on higher RPC latency then default",
blockTime: now.Add(-65 * time.Second),
alertLatency: 0, // 0 means not set
alerted: true,
},
{
name: "should not alert on normal RPC latency compared to default",
blockTime: now.Add(-55 * time.Second),
alertLatency: 0, // 0 means not set
alerted: false,
},
}

// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// create observer
ob := createObserver(t, chains.Ethereum, tt.alertLatency)

alerted := ob.AlertOnRPCLatency(tt.blockTime, defaultAlertLatency)
require.Equal(t, tt.alerted, alerted)
})
}
}

func createDatabase(t *testing.T) *db.DB {
sqlDatabase, err := db.NewFromSqliteInMemory(true)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/bitcoin/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (ob *Observer) Start(ctx context.Context) {
bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound))

// watch the RPC status of the bitcoin chain
bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain))
bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain))
}

// GetPendingNonce returns the artificial pending nonce
Expand Down
Loading

0 comments on commit 826da7b

Please sign in to comment.