Skip to content

Commit

Permalink
refactor(zetaclient): subscribe to new blocks in scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Nov 27, 2024
1 parent 771317f commit 8fab7cd
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 104 deletions.
14 changes: 14 additions & 0 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional
"os"
Expand All @@ -10,6 +11,7 @@ import (
"strings"
"syscall"

cometbft_client "github.com/cometbft/cometbft/rpc/client/http"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -217,12 +219,24 @@ func Start(_ *cobra.Command, _ []string) error {
return errors.Wrap(err, "unable to create chain observer map")
}

cometbftURL := fmt.Sprintf("http://%s:%d", cfg.ZetaCoreURL, 26657)
cometbftClient, err := cometbft_client.New(cometbftURL, "/websocket")
if err != nil {
return errors.Wrapf(err, "new cometbft client (%s)", cometbftURL)
}
// start websockets
err = cometbftClient.WSEvents.Start()
if err != nil {
return errors.Wrap(err, "cometbft start")
}

// Orchestrator wraps the zetacore client and adds the observers and signer maps to it.
// This is the high level object used for CCTX interactions
// It also handles background configuration updates from zetacore
maestro, err := orchestrator.New(
ctx,
zetacoreClient,
cometbftClient,
signerMap,
observerMap,
tss,
Expand Down
206 changes: 102 additions & 104 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"fmt"
"math"
"sync"
"time"

sdkmath "cosmossdk.io/math"
cometbft_rpc_client "github.com/cometbft/cometbft/rpc/client"
cometbft_rpc_types "github.com/cometbft/cometbft/rpc/core/types"
cometbft_types "github.com/cometbft/cometbft/types"
eth "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -51,6 +53,9 @@ type Orchestrator struct {
// zetacore client
zetacoreClient interfaces.ZetacoreClient

// cometbft client
cometbftClient cometbft_rpc_client.Client

// signerMap contains the chain signers indexed by chainID
signerMap map[int64]interfaces.ChainSigner

Expand Down Expand Up @@ -84,6 +89,7 @@ type multiLogger struct {
func New(
ctx context.Context,
client interfaces.ZetacoreClient,
cometbftClient cometbft_rpc_client.Client,
signerMap map[int64]interfaces.ChainSigner,
observerMap map[int64]interfaces.ChainObserver,
tss interfaces.TSSSigner,
Expand All @@ -105,6 +111,7 @@ func New(

return &Orchestrator{
zetacoreClient: client,
cometbftClient: cometbftClient,

signerMap: signerMap,
observerMap: observerMap,
Expand Down Expand Up @@ -281,6 +288,10 @@ func (oc *Orchestrator) GetPendingCctxsWithinRateLimit(ctx context.Context, chai
return output.CctxsMap, nil
}

func (oc *Orchestrator) subscribeNewBlock(ctx context.Context) (out <-chan cometbft_rpc_types.ResultEvent, err error) {
return oc.cometbftClient.Subscribe(ctx, "", "tm.event='NewBlock'")
}

// schedules keysigns for cctxs on each ZetaChain block (the ticker)
// TODO(revamp): make this function simpler
func (oc *Orchestrator) runScheduler(ctx context.Context) error {
Expand All @@ -289,122 +300,109 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error {
return err
}

observeTicker := time.NewTicker(3 * time.Second)
var lastBlockNum int64
blockEventChan, err := oc.subscribeNewBlock(ctx)
if err != nil {
return err
}

for {
select {
case <-oc.stop:
oc.logger.Warn().Msg("runScheduler: stopped")
return nil
case <-observeTicker.C:
{
bn, err := oc.zetacoreClient.GetBlockHeight(ctx)
case event := <-blockEventChan:
newBlockEvent, ok := event.Data.(cometbft_types.EventDataNewBlock)
if !ok {
return fmt.Errorf("expecting new block event, got %T", event.Data)
}
bn := newBlockEvent.Block.Height

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)
if err != nil {
oc.logger.Error().Err(err).Msg("StartCctxScheduler: GetBlockHeight fail")
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")
continue
}
if bn < 0 {
oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height")

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue
}
if lastBlockNum == 0 {
lastBlockNum = bn - 1

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {
continue
}
if bn > lastBlockNum { // we have a new block
bn = lastBlockNum + 1
if bn%10 == 0 {
oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn)
}

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")
continue
}

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue
}

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {
continue
}

if !app.IsOutboundObservationEnabled() {
continue
}

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue
}
}

// update last processed block number
lastBlockNum = bn
oc.ts.SetCoreBlockNumber(lastBlockNum)

if !app.IsOutboundObservationEnabled() {
continue
}

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue
}
}

// update last processed block number
oc.ts.SetCoreBlockNumber(bn)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions zetaclient/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
zctx "github.com/zeta-chain/node/zetaclient/context"

cometbft_rpc_client_mock "github.com/cometbft/cometbft/rpc/client/mocks"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/testutil/sample"
Expand Down Expand Up @@ -587,6 +588,7 @@ func mockOrchestrator(t *testing.T, zetaClient interfaces.ZetacoreClient, chains

return &Orchestrator{
zetacoreClient: zetaClient,
cometbftClient: &cometbft_rpc_client_mock.Client{},
signerMap: signers,
observerMap: observers,
}
Expand Down

0 comments on commit 8fab7cd

Please sign in to comment.