Skip to content

Commit

Permalink
Merge branch 'refactor-integrate-base-signer' into refactor-integrate…
Browse files Browse the repository at this point in the history
…-base-signer-observer-into-existing-structs
  • Loading branch information
ws4charlie authored Jun 20, 2024
2 parents 788a9b2 + 921ca67 commit 7faa962
Show file tree
Hide file tree
Showing 20 changed files with 1,173 additions and 1,521 deletions.
2 changes: 1 addition & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* [2296](https://github.com/zeta-chain/node/pull/2296) - move `testdata` package to `testutil` to organize test-related utilities
* [2344](https://github.com/zeta-chain/node/pull/2344) - group common data of EVM/Bitcoin signer and observer using base structs
* [2317](https://github.com/zeta-chain/node/pull/2317) - add ValidateOutbound method for cctx orchestrator
* [2355](https://github.com/zeta-chain/node/pull/2355) - integrated base Signer/Observer structures into EVM/Bitcoin Signer and Observer
* [2357](https://github.com/zeta-chain/node/pull/2357) - integrated base Signer structure into EVM/Bitcoin Signer

### Tests

Expand Down
66 changes: 3 additions & 63 deletions cmd/zetaclientd/utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package main

import (
"fmt"

"github.com/btcsuite/btcd/rpcclient"
sdk "github.com/cosmos/cosmos-sdk/types"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"

"github.com/zeta-chain/zetacore/zetaclient/authz"
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
Expand Down Expand Up @@ -120,84 +116,28 @@ func CreateChainObserverMap(
logger base.Logger,
ts *metrics.TelemetryServer,
) (map[int64]interfaces.ChainObserver, error) {
zetacoreContext := appContext.ZetacoreContext()
observerMap := make(map[int64]interfaces.ChainObserver)
// EVM observers
for _, evmConfig := range appContext.Config().GetAllEVMConfigs() {
if evmConfig.Chain.IsZetaChain() {
continue
}
chainParams, found := zetacoreContext.GetEVMChainParams(evmConfig.Chain.ChainId)
_, found := appContext.ZetacoreContext().GetEVMChainParams(evmConfig.Chain.ChainId)
if !found {
logger.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String())
continue
}

// create EVM client
evmClient, err := ethclient.Dial(evmConfig.Endpoint)
if err != nil {
logger.Std.Error().Err(err).Msgf("error dailing endpoint %s", evmConfig.Endpoint)
continue
}

// create EVM chain observer
co, err := evmobserver.NewObserver(
evmConfig,
evmClient,
*chainParams,
zetacoreContext,
zetacoreClient,
tss,
dbpath,
logger,
ts,
)
co, err := evmobserver.NewObserver(appContext, zetacoreClient, tss, dbpath, logger, evmConfig, ts)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for evm chain %s", evmConfig.Chain.String())
continue
}
observerMap[evmConfig.Chain.ChainId] = co
}

// BTC observer
_, chainParams, found := zetacoreContext.GetBTCChainParams()
if !found {
return nil, fmt.Errorf("bitcoin chains params not found")
}

// create BTC chain observer
btcChain, btcConfig, enabled := appContext.GetBTCChainAndConfig()
if enabled {
// create BTC client
connCfg := &rpcclient.ConnConfig{
Host: btcConfig.RPCHost,
User: btcConfig.RPCUsername,
Pass: btcConfig.RPCPassword,
HTTPPostMode: true,
DisableTLS: true,
Params: btcConfig.RPCParams,
}
btcClient, err := rpcclient.New(connCfg, nil)
if err != nil {
return nil, fmt.Errorf("error creating rpc client: %s", err)
}
err = btcClient.Ping()
if err != nil {
return nil, fmt.Errorf("error ping the bitcoin server: %s", err)
}

// create BTC chain observer
co, err := btcobserver.NewObserver(
btcChain,
btcClient,
*chainParams,
zetacoreContext,
zetacoreClient,
tss,
dbpath,
logger,
ts,
)
co, err := btcobserver.NewObserver(appContext, btcChain, zetacoreClient, tss, dbpath, logger, btcConfig, ts)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for bitcoin chain %s", btcChain.String())

Expand Down
136 changes: 45 additions & 91 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/rs/zerolog"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"

"github.com/zeta-chain/zetacore/pkg/chains"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
Expand All @@ -29,12 +28,9 @@ const (
// Cached blocks can be used to get block information and verify transactions
DefaultBlockCacheSize = 1000

// DefaultHeaderCacheSize is the default number of headers that the observer will keep in cache for performance (without RPC calls)
// DefaultHeadersCacheSize is the default number of headers that the observer will keep in cache for performance (without RPC calls)
// Cached headers can be used to get header information
DefaultHeaderCacheSize = 1000

// TempSQLiteDBPath is the temporary in-memory SQLite database used for testing
TempSQLiteDBPath = "file::memory:?cache=shared"
DefaultHeadersCacheSize = 1000
)

// Observer is the base structure for chain observers, grouping the common logic for each chain observer client.
Expand Down Expand Up @@ -88,7 +84,8 @@ func NewObserver(
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
blockCacheSize int,
headerCacheSize int,
headersCacheSize int,
dbPath string,
ts *metrics.TelemetryServer,
logger Logger,
) (*Observer, error) {
Expand All @@ -115,25 +112,18 @@ func NewObserver(
}

// create header cache
ob.headerCache, err = lru.New(headerCacheSize)
ob.headerCache, err = lru.New(headersCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating header cache")
}

return &ob, nil
}

// Stop notifies all goroutines to stop and closes the database.
func (ob *Observer) Stop() {
ob.logger.Chain.Info().Msgf("observer is stopping for chain %d", ob.Chain().ChainId)
close(ob.stop)

// close database
err := ob.CloseDB()
// open database
err = ob.OpenDB(dbPath)
if err != nil {
ob.Logger().Chain.Error().Err(err).Msgf("CloseDB failed for chain %d", ob.Chain().ChainId)
return nil, errors.Wrap(err, fmt.Sprintf("error opening observer db for chain: %s", chain.ChainName))
}
ob.Logger().Chain.Info().Msgf("observer stopped for chain %d", ob.Chain().ChainId)

return &ob, nil
}

// Chain returns the chain for the observer.
Expand Down Expand Up @@ -242,20 +232,9 @@ func (ob *Observer) DB() *gorm.DB {
return ob.db
}

// WithTelemetryServer attaches a new telemetry server to the observer.
func (ob *Observer) WithTelemetryServer(ts *metrics.TelemetryServer) *Observer {
ob.ts = ts
return ob
}

// TelemetryServer returns the telemetry server for the observer.
func (ob *Observer) TelemetryServer() *metrics.TelemetryServer {
return ob.ts
}

// Logger returns the logger for the observer.
func (ob *Observer) Logger() *ObserverLogger {
return &ob.logger
func (ob *Observer) Logger() ObserverLogger {
return ob.logger
}

// WithLogger attaches a new logger to the observer.
Expand All @@ -272,64 +251,45 @@ func (ob *Observer) WithLogger(logger Logger) *Observer {
return ob
}

// StopChannel returns the stop channel for the observer.
func (ob *Observer) StopChannel() chan struct{} {
// Stop returns the stop channel for the observer.
func (ob *Observer) Stop() chan struct{} {
return ob.stop
}

// OpenDB open sql database in the given path.
func (ob *Observer) OpenDB(dbPath string, dbName string) error {
// create db path if not exist
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
err := os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return errors.Wrapf(err, "error creating db path: %s", dbPath)
func (ob *Observer) OpenDB(dbPath string) error {
if dbPath != "" {
// create db path if not exist
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
err := os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return errors.Wrap(err, "error creating db path")
}
}
}

// use custom dbName or chain name if not provided
if dbName == "" {
dbName = ob.chain.ChainName.String()
}
path := fmt.Sprintf("%s/%s", dbPath, dbName)

// use memory db if specified
if dbPath == TempSQLiteDBPath {
path = TempSQLiteDBPath
}

// open db
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)})
if err != nil {
return errors.Wrap(err, "error opening db")
}

// migrate db
err = db.AutoMigrate(&clienttypes.LastBlockSQLType{})
if err != nil {
return errors.Wrap(err, "error migrating db")
}
ob.db = db

return nil
}
// open db by chain name
chainName := ob.chain.ChainName.String()
path := fmt.Sprintf("%s/%s", dbPath, chainName)
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{})
if err != nil {
return errors.Wrap(err, "error opening db")
}

// CloseDB close the database.
func (ob *Observer) CloseDB() error {
dbInst, err := ob.db.DB()
if err != nil {
return fmt.Errorf("error getting database instance: %w", err)
}
err = dbInst.Close()
if err != nil {
return fmt.Errorf("error closing database: %w", err)
// migrate db
err = db.AutoMigrate(&clienttypes.ReceiptSQLType{},
&clienttypes.TransactionSQLType{},
&clienttypes.LastBlockSQLType{})
if err != nil {
return errors.Wrap(err, "error migrating db")
}
ob.db = db
}
return nil
}

// LoadLastBlockScanned loads last scanned block from environment variable or from database.
// The last scanned block is the height from which the observer should continue scanning.
func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) error {
func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool, err error) {
// get environment variable
envvar := EnvVarLatestBlockByChain(ob.chain)
scanFromBlock := os.Getenv(envvar)
Expand All @@ -339,33 +299,27 @@ func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) error {
logger.Info().
Msgf("LoadLastBlockScanned: envvar %s is set; scan from block %s", envvar, scanFromBlock)
if scanFromBlock == EnvVarLatestBlock {
return nil
return true, nil
}
blockNumber, err := strconv.ParseUint(scanFromBlock, 10, 64)
if err != nil {
return err
return false, err
}
ob.WithLastBlockScanned(blockNumber)
return nil
return false, nil
}

// load from DB otherwise. If not found, start from latest block
blockNumber, err := ob.ReadLastBlockScannedFromDB()
if err != nil {
logger.Info().Msgf("LoadLastBlockScanned: last scanned block not found in db for chain %d", ob.chain.ChainId)
return nil
logger.Info().Msgf("LoadLastBlockScanned: chain %d starts scanning from latest block", ob.chain.ChainId)
return true, nil
}
ob.WithLastBlockScanned(blockNumber)
logger.Info().
Msgf("LoadLastBlockScanned: chain %d starts scanning from block %d", ob.chain.ChainId, ob.LastBlockScanned())

return nil
}

// SaveLastBlockScanned saves the last scanned block to memory and database.
func (ob *Observer) SaveLastBlockScanned(blockNumber uint64) error {
ob.WithLastBlockScanned(blockNumber)
return ob.WriteLastBlockScannedToDB(blockNumber)
return false, nil
}

// WriteLastBlockScannedToDB saves the last scanned block to the database.
Expand All @@ -385,5 +339,5 @@ func (ob *Observer) ReadLastBlockScannedFromDB() (uint64, error) {

// EnvVarLatestBlock returns the environment variable for the latest block by chain.
func EnvVarLatestBlockByChain(chain chains.Chain) string {
return fmt.Sprintf("CHAIN_%d_SCAN_FROM", chain.ChainId)
return chain.ChainName.String() + "_SCAN_FROM"
}
Loading

0 comments on commit 7faa962

Please sign in to comment.