diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index a948ac396f..b213e63ee9 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -252,11 +252,6 @@ func (ob *Observer) WithLastTxScanned(txHash string) *Observer { return ob } -// RPCAlertLatency returns the RPC alert latency for the observer. -func (ob *Observer) RPCAlertLatency() time.Duration { - return ob.rpcAlertLatency -} - // BlockCache returns the block cache for the observer. func (ob *Observer) BlockCache() *lru.Cache { return ob.blockCache @@ -463,6 +458,27 @@ func (ob *Observer) PostVoteInbound( return ballot, err } +// AlertOnRPCLatency prints an alert if the RPC latency exceeds the threshold. +// Returns true if the RPC latency is too high. +func (ob *Observer) AlertOnRPCLatency(latestBlockTime time.Time, defaultAlertLatency time.Duration) bool { + // use configured alert latency if set + alertLatency := defaultAlertLatency + if ob.rpcAlertLatency > 0 { + alertLatency = ob.rpcAlertLatency + } + + // latest block should not be too old + elapsedTime := time.Since(latestBlockTime) + if elapsedTime > alertLatency { + ob.logger.Chain.Error(). + Msgf("RPC is stale: latest block is %.0f seconds old, RPC down or chain stuck (check explorer)?", elapsedTime.Seconds()) + return true + } + + ob.logger.Chain.Info().Msgf("RPC is OK: latest block is %.0f seconds old", elapsedTime.Seconds()) + return false +} + // EnvVarLatestBlockByChain returns the environment variable for the last block by chain. func EnvVarLatestBlockByChain(chain chains.Chain) string { return fmt.Sprintf("CHAIN_%d_SCAN_FROM_BLOCK", chain.ChainId) diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index 4db8a19bbc..88cd1f0cf5 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "testing" + "time" lru "github.com/hashicorp/golang-lru" "github.com/rs/zerolog" @@ -23,8 +24,13 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/testutils/mocks" ) +const ( + // defaultAlertLatency is the default alert latency for unit tests + defaultAlertLatency = 60 * time.Second +) + // createObserver creates a new observer for testing -func createObserver(t *testing.T, chain chains.Chain) *base.Observer { +func createObserver(t *testing.T, chain chains.Chain, alertLatency time.Duration) *base.Observer { // constructor parameters chainParams := *sample.ChainParams(chain.ChainId) zetacoreClient := mocks.NewZetacoreClient(t) @@ -41,7 +47,7 @@ func createObserver(t *testing.T, chain chains.Chain) *base.Observer { tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, - 60, + alertLatency, nil, database, logger, @@ -143,7 +149,7 @@ func TestNewObserver(t *testing.T) { func TestStop(t *testing.T) { t.Run("should be able to stop observer", func(t *testing.T) { // create observer and initialize db - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // stop observer ob.Stop() @@ -154,7 +160,7 @@ func TestObserverGetterAndSetter(t *testing.T) { chain := chains.Ethereum t.Run("should be able to update chain", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update chain newChain := chains.BscMainnet @@ -163,7 +169,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update chain params", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update chain params newChainParams := *sample.ChainParams(chains.BscMainnet.ChainId) @@ -172,7 +178,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update zetacore client", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update zetacore client newZetacoreClient := mocks.NewZetacoreClient(t) @@ -181,7 +187,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update tss", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update tss newTSS := mocks.NewTSSAthens3() @@ -190,7 +196,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update last block", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last block newLastBlock := uint64(100) @@ -199,7 +205,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update last block scanned", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last block scanned newLastBlockScanned := uint64(100) @@ -208,7 +214,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update last tx scanned", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last tx scanned newLastTxScanned := sample.EthAddress().String() @@ -216,15 +222,8 @@ func TestObserverGetterAndSetter(t *testing.T) { require.Equal(t, newLastTxScanned, ob.LastTxScanned()) }) - t.Run("should be able to get rpc alert latency", func(t *testing.T) { - ob := createObserver(t, chain) - - // get rpc alert latency - require.EqualValues(t, 60, ob.RPCAlertLatency()) - }) - t.Run("should be able to replace block cache", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update block cache newBlockCache, err := lru.New(200) @@ -235,7 +234,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to replace header cache", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update headers cache newHeadersCache, err := lru.New(200) @@ -246,7 +245,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update telemetry server", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update telemetry server newServer := metrics.NewTelemetryServer() @@ -255,7 +254,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to get logger", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) logger := ob.Logger() // should be able to print log @@ -293,7 +292,7 @@ func TestOutboundID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // create observer - ob := createObserver(t, tt.chain) + ob := createObserver(t, tt.chain, defaultAlertLatency) ob = ob.WithTSS(tt.tss) // get outbound id @@ -315,7 +314,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("should be able to load last block scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned err := ob.WriteLastBlockScannedToDB(100) @@ -329,7 +328,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("latest block scanned should be 0 if not found in db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // read last block scanned err := ob.LoadLastBlockScanned(log.Logger) @@ -339,7 +338,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("should overwrite last block scanned if env var is set", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned ob.WriteLastBlockScannedToDB(100) @@ -355,7 +354,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("last block scanned should remain 0 if env var is set to latest", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned ob.WriteLastBlockScannedToDB(100) @@ -371,7 +370,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("should return error on invalid env var", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // set invalid env var os.Setenv(envvar, "invalid") @@ -385,7 +384,7 @@ func TestLoadLastBlockScanned(t *testing.T) { func TestSaveLastBlockScanned(t *testing.T) { t.Run("should be able to save last block scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // save 100 as last block scanned err := ob.SaveLastBlockScanned(100) @@ -405,7 +404,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) { chain := chains.Ethereum t.Run("should be able to write and read last block scanned to db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // write last block scanned err := ob.WriteLastBlockScannedToDB(100) @@ -418,7 +417,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) { t.Run("should return error when last block scanned not found in db", func(t *testing.T) { // create empty db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) lastScannedBlock, err := ob.ReadLastBlockScannedFromDB() require.Error(t, err) @@ -432,7 +431,7 @@ func TestLoadLastTxScanned(t *testing.T) { t.Run("should be able to load last tx scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write sample hash as last tx scanned ob.WriteLastTxScannedToDB(lastTx) @@ -444,7 +443,7 @@ func TestLoadLastTxScanned(t *testing.T) { t.Run("latest tx scanned should be empty if not found in db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // read last tx scanned ob.LoadLastTxScanned() @@ -453,7 +452,7 @@ func TestLoadLastTxScanned(t *testing.T) { t.Run("should overwrite last tx scanned if env var is set", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write sample hash as last tx scanned ob.WriteLastTxScannedToDB(lastTx) @@ -472,7 +471,7 @@ func TestSaveLastTxScanned(t *testing.T) { chain := chains.SolanaDevnet t.Run("should be able to save last tx scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // save random tx hash lastSlot := uint64(100) @@ -495,7 +494,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { chain := chains.SolanaDevnet t.Run("should be able to write and read last tx scanned to db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // write last tx scanned lastTx := "5LuQMorgd11p8GWEw6pmyHCDtA26NUyeNFhLWPNk2oBoM9pkag1LzhwGSRos3j4TJLhKjswFhZkGtvSGdLDkmqsk" @@ -509,7 +508,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { t.Run("should return error when last tx scanned not found in db", func(t *testing.T) { // create empty db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) lastTxScanned, err := ob.ReadLastTxScannedFromDB() require.Error(t, err) @@ -520,7 +519,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { func TestPostVoteInbound(t *testing.T) { t.Run("should be able to post vote inbound", func(t *testing.T) { // create observer - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // create mock zetacore client zetacoreClient := mocks.NewZetacoreClient(t) @@ -535,6 +534,53 @@ func TestPostVoteInbound(t *testing.T) { }) } +func TestAlertOnRPCLatency(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + blockTime time.Time + alertLatency time.Duration + alerted bool + }{ + { + name: "should alert on high RPC latency", + blockTime: now.Add(-60 * time.Second), + alertLatency: 55, + alerted: true, + }, + { + name: "should not alert on normal RPC latency", + blockTime: now.Add(-60 * time.Second), + alertLatency: 65, + alerted: false, + }, + { + name: "should alert on higher RPC latency then default", + blockTime: now.Add(-65 * time.Second), + alertLatency: 0, // 0 means not set + alerted: true, + }, + { + name: "should not alert on normal RPC latency compared to default", + blockTime: now.Add(-55 * time.Second), + alertLatency: 0, // 0 means not set + alerted: false, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // create observer + ob := createObserver(t, chains.Ethereum, tt.alertLatency) + + alerted := ob.AlertOnRPCLatency(tt.blockTime, defaultAlertLatency) + require.Equal(t, tt.alerted, alerted) + }) + } +} + func createDatabase(t *testing.T) *db.DB { sqlDatabase, err := db.NewFromSqliteInMemory(true) require.NoError(t, err) diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 7a74164fff..6feeffbbe0 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -223,7 +223,7 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) // watch the RPC status of the bitcoin chain - bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // GetPendingNonce returns the artificial pending nonce diff --git a/zetaclient/chains/bitcoin/observer/rpc_status.go b/zetaclient/chains/bitcoin/observer/rpc_status.go index f9f6ad97b4..6e99ebf024 100644 --- a/zetaclient/chains/bitcoin/observer/rpc_status.go +++ b/zetaclient/chains/bitcoin/observer/rpc_status.go @@ -8,8 +8,8 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/common" ) -// WatchRPCStatus watches the RPC status of the Bitcoin chain -func (ob *Observer) WatchRPCStatus(_ context.Context) error { +// watchRPCStatus watches the RPC status of the Bitcoin chain +func (ob *Observer) watchRPCStatus(_ context.Context) error { ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) ticker := time.NewTicker(common.RPCStatusCheckInterval) @@ -20,15 +20,22 @@ func (ob *Observer) WatchRPCStatus(_ context.Context) error { continue } - alertLatency := ob.RPCAlertLatency() - tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash() - err := rpc.CheckRPCStatus(ob.btcClient, alertLatency, tssAddress, ob.Logger().Chain) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") - } - + ob.checkRPCStatus() case <-ob.StopChannel(): return nil } } } + +// checkRPCStatus checks the RPC status of the Bitcoin chain +func (ob *Observer) checkRPCStatus() { + tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash() + blockTime, err := rpc.CheckRPCStatus(ob.btcClient, tssAddress) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/bitcoin/rpc/rpc.go b/zetaclient/chains/bitcoin/rpc/rpc.go index a265dc45f3..472e8b0e0f 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc.go +++ b/zetaclient/chains/bitcoin/rpc/rpc.go @@ -10,7 +10,6 @@ import ( "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcutil" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" @@ -165,59 +164,36 @@ func GetRecentFeeRate(rpcClient interfaces.BTCRPCClient, netParams *chaincfg.Par return uint64(highestRate), nil } -// CheckRPCStatus checks the RPC status of the evm chain -func CheckRPCStatus( - client interfaces.BTCRPCClient, - alertLatency time.Duration, - tssAddress btcutil.Address, - logger zerolog.Logger, -) error { +// CheckRPCStatus checks the RPC status of the bitcoin chain +func CheckRPCStatus(client interfaces.BTCRPCClient, tssAddress btcutil.Address) (time.Time, error) { // query latest block number bn, err := client.GetBlockCount() if err != nil { - return errors.Wrap(err, "RPC error onGetBlockCount: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockCount, RPC down?") } // query latest block header hash, err := client.GetBlockHash(bn) if err != nil { - return errors.Wrap(err, "RPC error onGetBlockHash: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockHash, RPC down?") } // query latest block header thru hash header, err := client.GetBlockHeader(hash) if err != nil { - return errors.Wrap(err, "RPC error onGetBlockHeader: RPC down?") - } - - // use default alert latency if not provided - if alertLatency <= 0 { - alertLatency = RPCAlertLatency - } - - // latest block should not be too old - blockTime := header.Timestamp - elapsedTime := time.Since(blockTime) - if elapsedTime > alertLatency { - return errors.Errorf( - "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", - bn, - elapsedTime.Seconds(), - ) + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockHeader, RPC down?") } // should be able to list utxos owned by TSS address res, err := client.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddress}) if err != nil { - return errors.Wrap(err, "can't list utxos of TSS address; TSS address is not imported?") + return time.Time{}, errors.Wrap(err, "can't list utxos of TSS address; TSS address is not imported?") } // TSS address should have utxos if len(res) == 0 { - return errors.New("TSS address has no utxos; TSS address is not imported?") + return time.Time{}, errors.New("TSS address has no utxos; TSS address is not imported?") } - logger.Info(). - Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedTime.Seconds(), tssAddress, len(res)) - return nil + return header.Timestamp, nil } diff --git a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go index cde5fae2b7..4e12f33507 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go +++ b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go @@ -265,7 +265,7 @@ func LiveTestCheckRPCStatus(t *testing.T) { require.NoError(t, err) // check RPC status - err = rpc.CheckRPCStatus(client, rpc.RPCAlertLatency, tssAddress, log.Logger) + _, err = rpc.CheckRPCStatus(client, tssAddress) require.NoError(t, err) } diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index 5ed0acf136..3ac07cd375 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -200,7 +200,7 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchOutbound, bg.WithName("WatchOutbound"), bg.WithLogger(ob.Logger().Outbound)) bg.Work(ctx, ob.WatchGasPrice, bg.WithName("WatchGasPrice"), bg.WithLogger(ob.Logger().GasPrice)) bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) - bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // SetTxNReceipt sets the receipt and transaction in memory diff --git a/zetaclient/chains/evm/observer/rpc_status.go b/zetaclient/chains/evm/observer/rpc_status.go index 525c20b1ba..cd2ad4d917 100644 --- a/zetaclient/chains/evm/observer/rpc_status.go +++ b/zetaclient/chains/evm/observer/rpc_status.go @@ -9,9 +9,9 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/common" ) -// WatchRPCStatus watches the RPC status of the evm chain -func (ob *Observer) WatchRPCStatus(ctx context.Context) error { - ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) +// watchRPCStatus watches the RPC status of the EVM chain +func (ob *Observer) watchRPCStatus(ctx context.Context) error { + ob.Logger().Chain.Info().Msgf("watchRPCStatus started for chain %d", ob.Chain().ChainId) ticker := time.NewTicker(common.RPCStatusCheckInterval) for { @@ -21,13 +21,21 @@ func (ob *Observer) WatchRPCStatus(ctx context.Context) error { continue } - alertLatency := ob.RPCAlertLatency() - err := rpc.CheckRPCStatus(ctx, ob.evmClient, alertLatency, ob.Logger().Chain) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") - } + ob.checkRPCStatus(ctx) case <-ob.StopChannel(): return nil } } } + +// checkRPCStatus checks the RPC status of the EVM chain +func (ob *Observer) checkRPCStatus(ctx context.Context) { + blockTime, err := rpc.CheckRPCStatus(ctx, ob.evmClient) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/evm/rpc/rpc.go b/zetaclient/chains/evm/rpc/rpc.go index 22bea55110..4ae5441fc8 100644 --- a/zetaclient/chains/evm/rpc/rpc.go +++ b/zetaclient/chains/evm/rpc/rpc.go @@ -7,7 +7,6 @@ import ( ethcommon "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" ) @@ -61,48 +60,28 @@ func IsTxConfirmed( } // CheckRPCStatus checks the RPC status of the evm chain -func CheckRPCStatus( - ctx context.Context, - client interfaces.EVMRPCClient, - alertLatency time.Duration, - logger zerolog.Logger, -) error { +func CheckRPCStatus(ctx context.Context, client interfaces.EVMRPCClient) (time.Time, error) { // query latest block number bn, err := client.BlockNumber(ctx) if err != nil { - return errors.Wrap(err, "RPC error onBlockNumber: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on BlockNumber, RPC down?") } // query suggested gas price - gasPrice, err := client.SuggestGasPrice(ctx) + _, err = client.SuggestGasPrice(ctx) if err != nil { - return errors.Wrap(err, "RPC error onSuggestGasPrice: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on SuggestGasPrice, RPC down?") } // query latest block header header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(bn)) if err != nil { - return errors.Wrap(err, "RPC error onHeaderByNumber: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on HeaderByNumber, RPC down?") } - // use default alert latency if not provided - if alertLatency <= 0 { - alertLatency = RPCAlertLatency - } - - // latest block should not be too old + // convert block time to UTC // #nosec G115 always in range blockTime := time.Unix(int64(header.Time), 0).UTC() - elapsedTime := time.Since(blockTime) - if elapsedTime > alertLatency { - return errors.Errorf( - "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", - bn, - elapsedTime.Seconds(), - ) - } - logger.Info(). - Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.0fs ago), gas price %s", header.Number, blockTime.String(), elapsedTime.Seconds(), gasPrice.String()) - return nil + return blockTime, nil } diff --git a/zetaclient/chains/evm/rpc/rpc_live_test.go b/zetaclient/chains/evm/rpc/rpc_live_test.go index c6f2ad2368..5e0f7ca59a 100644 --- a/zetaclient/chains/evm/rpc/rpc_live_test.go +++ b/zetaclient/chains/evm/rpc/rpc_live_test.go @@ -5,7 +5,6 @@ import ( "math" "github.com/ethereum/go-ethereum/ethclient" - "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc" "github.com/zeta-chain/zetacore/zetaclient/common" @@ -56,6 +55,6 @@ func LiveTest_CheckRPCStatus(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = rpc.CheckRPCStatus(ctx, client, rpc.RPCAlertLatency, log.Logger) + _, err = rpc.CheckRPCStatus(ctx, client) require.NoError(t, err) } diff --git a/zetaclient/chains/solana/observer/observer.go b/zetaclient/chains/solana/observer/observer.go index cc2e3a8204..d44f8e0803 100644 --- a/zetaclient/chains/solana/observer/observer.go +++ b/zetaclient/chains/solana/observer/observer.go @@ -135,7 +135,7 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) // watch RPC status of the Solana chain - bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // LoadLastTxScanned loads the last scanned tx from the database. diff --git a/zetaclient/chains/solana/observer/rpc_status.go b/zetaclient/chains/solana/observer/rpc_status.go index 7910ff6f7e..a01d128683 100644 --- a/zetaclient/chains/solana/observer/rpc_status.go +++ b/zetaclient/chains/solana/observer/rpc_status.go @@ -4,13 +4,14 @@ import ( "context" "time" + "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/zetaclient/chains/solana/rpc" "github.com/zeta-chain/zetacore/zetaclient/common" ) -// WatchRPCStatus watches the RPC status of the solana chain -func (ob *Observer) WatchRPCStatus(ctx context.Context) error { - ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) +// watchRPCStatus watches the RPC status of the Solana chain +func (ob *Observer) watchRPCStatus(ctx context.Context) error { + ob.Logger().Chain.Info().Msgf("watchRPCStatus started for chain %d", ob.Chain().ChainId) ticker := time.NewTicker(common.RPCStatusCheckInterval) for { @@ -20,13 +21,25 @@ func (ob *Observer) WatchRPCStatus(ctx context.Context) error { continue } - alertLatency := ob.RPCAlertLatency() - err := rpc.CheckRPCStatus(ctx, ob.solClient, alertLatency, ob.Logger().Chain) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") - } + ob.checkRPCStatus(ctx) case <-ob.StopChannel(): return nil } } } + +// checkRPCStatus checks the RPC status of the Solana chain +func (ob *Observer) checkRPCStatus(ctx context.Context) { + // Solana privnet doesn't have RPC 'GetHealth', need to differentiate + privnet := ob.Chain().NetworkType == chains.NetworkType_privnet + + // check the RPC status + blockTime, err := rpc.CheckRPCStatus(ctx, ob.solClient, privnet) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/solana/rpc/rpc.go b/zetaclient/chains/solana/rpc/rpc.go index c5cf6b0989..c1f378365b 100644 --- a/zetaclient/chains/solana/rpc/rpc.go +++ b/zetaclient/chains/solana/rpc/rpc.go @@ -7,7 +7,6 @@ import ( "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" ) @@ -124,46 +123,26 @@ func GetSignaturesForAddressUntil( } // CheckRPCStatus checks the RPC status of the solana chain -func CheckRPCStatus( - ctx context.Context, - client interfaces.SolanaRPCClient, - alertLatency time.Duration, - logger zerolog.Logger, -) error { +func CheckRPCStatus(ctx context.Context, client interfaces.SolanaRPCClient, privnet bool) (time.Time, error) { // query solana health (always return "ok" unless --trusted-validator is provided) - _, err := client.GetHealth(ctx) - if err != nil { - return errors.Wrap(err, "RPC error onGetHealth: RPC down?") + if !privnet { + _, err := client.GetHealth(ctx) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on GetHealth, RPC down?") + } } // query latest slot slot, err := client.GetSlot(ctx, rpc.CommitmentFinalized) if err != nil { - return errors.Wrap(err, "RPC error onGetSlot: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on GetSlot, RPC down?") } // query latest block time blockTime, err := client.GetBlockTime(ctx, slot) if err != nil { - return errors.Wrap(err, "RPC error onGetBlockTime: RPC down?") - } - - // use default alert latency if not provided - if alertLatency <= 0 { - alertLatency = RPCAlertLatency - } - - // latest block should not be too old - elapsedTime := time.Since(blockTime.Time()) - if elapsedTime > alertLatency { - return errors.Errorf( - "Latest slot %d is %.0fs old, RPC stale or chain stuck (check explorer)?", - slot, - elapsedTime.Seconds(), - ) + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockTime, RPC down?") } - logger.Info(). - Msgf("RPC Status [OK]: latest slot %d, timestamp %s (%.0fs ago)", slot, blockTime.String(), elapsedTime.Seconds()) - return nil + return blockTime.Time(), nil } diff --git a/zetaclient/chains/solana/rpc/rpc_live_test.go b/zetaclient/chains/solana/rpc/rpc_live_test.go index 35c1b6ae70..51c51c2f61 100644 --- a/zetaclient/chains/solana/rpc/rpc_live_test.go +++ b/zetaclient/chains/solana/rpc/rpc_live_test.go @@ -6,7 +6,6 @@ import ( "github.com/gagliardetto/solana-go" solanarpc "github.com/gagliardetto/solana-go/rpc" - "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/zetaclient/chains/solana/rpc" "github.com/zeta-chain/zetacore/zetaclient/common" @@ -68,6 +67,6 @@ func LiveTest_CheckRPCStatus(t *testing.T) { // check the RPC status ctx := context.Background() - err := rpc.CheckRPCStatus(ctx, client, rpc.RPCAlertLatency, log.Logger) + _, err := rpc.CheckRPCStatus(ctx, client, false) require.NoError(t, err) }