Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(zetaclient): move app context update from zetacore client #3131

Merged
merged 10 commits into from
Nov 12, 2024
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [3118](https://github.com/zeta-chain/node/pull/3118) - zetaclient: remove hsm signer
* [3122](https://github.com/zeta-chain/node/pull/3122) - improve & refactor zetaclientd cli
* [3125](https://github.com/zeta-chain/node/pull/3125) - drop support for header proofs
* [3131](https://github.com/zeta-chain/node/pull/3131) - move app context update from zetacore client
* [3137](https://github.com/zeta-chain/node/pull/3137) - remove chain.Chain from zetaclientd config

### Fixes
Expand Down
4 changes: 3 additions & 1 deletion cmd/zetaclientd/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/orchestrator"
"github.com/zeta-chain/node/zetaclient/zetacore"
)

Expand Down Expand Up @@ -69,7 +70,8 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
appContext := zctx.New(cfg, nil, zerolog.Nop())
ctx := zctx.WithAppContext(context.Background(), appContext)

if err := client.UpdateAppContext(ctx, appContext, zerolog.Nop()); err != nil {
err = orchestrator.UpdateAppContext(ctx, appContext, client, zerolog.Nop())
if err != nil {
return errors.Wrap(err, "failed to update app context")
}

Expand Down
82 changes: 46 additions & 36 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/pkg/ticker"
observerTypes "github.com/zeta-chain/node/x/observer/types"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
Expand Down Expand Up @@ -100,7 +101,7 @@ func Start(_ *cobra.Command, _ []string) error {
}

// Wait until zetacore is ready to create blocks
if err = zetacoreClient.WaitForZetacoreToCreateBlocks(ctx); err != nil {
if err = waitForZetacoreToCreateBlocks(ctx, zetacoreClient, startLogger); err != nil {
startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error")
return err
}
Expand Down Expand Up @@ -141,14 +142,12 @@ func Start(_ *cobra.Command, _ []string) error {
startLogger.Debug().Msgf("createAuthzSigner is ready")

// Initialize core parameters from zetacore
if err = zetacoreClient.UpdateAppContext(ctx, appContext, startLogger); err != nil {
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, startLogger); err != nil {
return errors.Wrap(err, "unable to update app context")
}

startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

go zetacoreClient.UpdateAppContextWorker(ctx, appContext)
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

// Generate TSS address . The Tss address is generated through Keygen ceremony. The TSS key is used to sign all outbound transactions .
// The hotkeyPk is private key for the Hotkey. The Hotkey is used to sign all inbound transactions
// Each node processes a portion of the key stored in ~/.tss by default . Custom location can be specified in config file during init.
Expand Down Expand Up @@ -201,33 +200,36 @@ func Start(_ *cobra.Command, _ []string) error {
}

// Create TSS server
server, err := mc.SetupTSSServer(peers, priKey, preParams, appContext.Config(), tssKeyPass, true, whitelistedPeers)
tssServer, err := mc.SetupTSSServer(
peers,
priKey,
preParams,
appContext.Config(),
tssKeyPass,
true,
whitelistedPeers,
)
if err != nil {
return fmt.Errorf("SetupTSSServer error: %w", err)
}

// Set P2P ID for telemetry
telemetryServer.SetP2PID(server.GetLocalPeerID())
telemetryServer.SetP2PID(tssServer.GetLocalPeerID())

// Creating a channel to listen for os signals (or other signals)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Maintenance workers ============
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

go func() {
for {
time.Sleep(30 * time.Second)
ps := server.GetKnownPeers()
ps := tssServer.GetKnownPeers()
metrics.NumConnectedPeers.Set(float64(len(ps)))
telemetryServer.SetConnectedPeers(ps)
}
}()
go func() {
host := server.GetP2PHost()
host := tssServer.GetP2PHost()
pingRTT := make(map[peer.ID]int64)
for {
var wg sync.WaitGroup
Expand All @@ -254,7 +256,7 @@ func Start(_ *cobra.Command, _ []string) error {

// Generate a new TSS if keygen is set and add it into the tss server
// If TSS has already been generated, and keygen was successful ; we use the existing TSS
err = mc.Generate(ctx, masterLogger, zetacoreClient, server)
err = mc.Generate(ctx, zetacoreClient, tssServer, masterLogger)
if err != nil {
return err
}
Expand All @@ -264,7 +266,7 @@ func Start(_ *cobra.Command, _ []string) error {
zetacoreClient,
tssHistoricalList,
hotkeyPass,
server,
tssServer,
)
if err != nil {
startLogger.Error().Err(err).Msg("NewTSS error")
Expand All @@ -279,23 +281,26 @@ func Start(_ *cobra.Command, _ []string) error {

// Wait for TSS keygen to be successful before proceeding, This is a blocking thread only for a new keygen.
// For existing keygen, this should directly proceed to the next step
ticker := time.NewTicker(time.Second * 1)
for range ticker.C {
keyGen := appContext.GetKeygen()
if keyGen.Status != observerTypes.KeygenStatus_KeyGenSuccess {
startLogger.Info().Msgf("Waiting for TSS Keygen to be a success, current status %s", keyGen.Status)
continue
_ = ticker.Run(ctx, time.Second, func(ctx context.Context, t *ticker.Ticker) error {
keygen, err = zetacoreClient.GetKeyGen(ctx)
switch {
case err != nil:
startLogger.Warn().Err(err).Msg("Waiting for TSS Keygen to be a success, got error")
case keygen.Status != observerTypes.KeygenStatus_KeyGenSuccess:
startLogger.Warn().Msgf("Waiting for TSS Keygen to be a success, current status %s", keygen.Status)
default:
t.Stop()
}
break
}

return nil
})

// Update Current TSS value from zetacore, if TSS keygen is successful, the TSS address is set on zeta-core
// Returns err if the RPC call fails as zeta client needs the current TSS address to be set
// This is only needed in case of a new Keygen , as the TSS address is set on zetacore only after the keygen is successful i.e enough votes have been broadcast
currentTss, err := zetacoreClient.GetTSS(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetCurrentTSS error")
return err
return errors.Wrap(err, "unable to get current TSS")
}

// Filter supported BTC chain IDs
Expand All @@ -314,6 +319,13 @@ func Start(_ *cobra.Command, _ []string) error {
return err
}

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

if len(appContext.ListChainIDs()) == 0 {
startLogger.Error().Interface("config", cfg).Msgf("No chains in updated config")
}
Expand Down Expand Up @@ -348,12 +360,12 @@ func Start(_ *cobra.Command, _ []string) error {
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
if err != nil {
startLogger.Err(err).Msg("CreateChainObserverMap")
return err
return errors.Wrap(err, "unable to create chain observer map")
}

// Orchestrator wraps the zetacore client and adds the observers and signer maps to it.
// This is the high level object used for CCTX interactions
// It also handles background configuration updates from zetacore
maestro, err := orchestrator.New(
ctx,
zetacoreClient,
Expand All @@ -365,14 +377,12 @@ func Start(_ *cobra.Command, _ []string) error {
telemetryServer,
)
if err != nil {
startLogger.Error().Err(err).Msg("Unable to create orchestrator")
return err
return errors.Wrap(err, "unable to create orchestrator")
}

// Start orchestrator with all observers and signers
if err := maestro.Start(ctx); err != nil {
startLogger.Error().Err(err).Msg("Unable to start orchestrator")
return err
if err = maestro.Start(ctx); err != nil {
return errors.Wrap(err, "unable to start orchestrator")
}

// start zeta supply checker
Expand All @@ -389,12 +399,12 @@ func Start(_ *cobra.Command, _ []string) error {
// defer zetaSupplyChecker.Stop()
//}

startLogger.Info().Msgf("Zetaclientd is running")
startLogger.Info().Msg("zetaclientd is running")

sig := <-signalChannel
startLogger.Info().Msgf("Stop signal received: %q", sig)
startLogger.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig)

zetacoreClient.Stop()
maestro.Stop()

return nil
}
Expand Down
30 changes: 30 additions & 0 deletions cmd/zetaclientd/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net"
"strings"
Expand All @@ -13,6 +14,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/zeta-chain/node/zetaclient/authz"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
"github.com/zeta-chain/node/zetaclient/config"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/zetacore"
Expand Down Expand Up @@ -71,6 +73,34 @@ func waitForZetaCore(config config.Config, logger zerolog.Logger) {
}
}

func waitForZetacoreToCreateBlocks(ctx context.Context, zc interfaces.ZetacoreClient, logger zerolog.Logger) error {
const (
interval = 5 * time.Second
attempts = 15
)

var (
retryCount = 0
start = time.Now()
)

for {
blockHeight, err := zc.GetBlockHeight(ctx)
if err == nil && blockHeight > 1 {
logger.Info().Msgf("Zeta block height: %d", blockHeight)
return nil
}

retryCount++
if retryCount > attempts {
return fmt.Errorf("zetacore is not ready, timeout %s", time.Since(start).String())
}

logger.Info().Msgf("Failed to get block number, retry : %d/%d", retryCount, attempts)
time.Sleep(interval)
}
}

func validatePeer(seedPeer string) error {
parsedPeer := strings.Split(seedPeer, "/")

Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewCometBFTClients(url string) (Clients, error) {
return newClients(clientCtx)
}

// NewGRPCClient creates a Clients which uses gRPC as the transport
// NewGRPCClients creates a Clients which uses gRPC as the transport
func NewGRPCClients(url string, opts ...grpc.DialOption) (Clients, error) {
grpcConn, err := grpc.Dial(url, opts...)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/rpc/clients_cosmos.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
"github.com/zeta-chain/node/cmd/zetacored/config"
)

// GetUpgradePlan returns the current upgrade plan.
// if there is no active upgrade plan, plan will be nil, err will be nil as well.
// GetUpgradePlan returns the current upgrade plan or nil if there is no plan.
func (c *Clients) GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error) {
in := &upgradetypes.QueryCurrentPlanRequest{}

Expand Down
10 changes: 7 additions & 3 deletions zetaclient/chains/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -102,6 +103,10 @@ type ZetacoreClient interface {
GetLogger() *zerolog.Logger
GetKeys() keyinterfaces.ObserverKeys

GetSupportedChains(ctx context.Context) ([]chains.Chain, error)
GetAdditionalChains(ctx context.Context) ([]chains.Chain, error)
GetChainParams(ctx context.Context) ([]*observertypes.ChainParams, error)

GetKeyGen(ctx context.Context) (observertypes.Keygen, error)
GetTSS(ctx context.Context) (observertypes.TSS, error)
GetTSSHistory(ctx context.Context) ([]observertypes.TSS, error)
Expand Down Expand Up @@ -130,10 +135,9 @@ type ZetacoreClient interface {
GetZetaHotKeyBalance(ctx context.Context) (sdkmath.Int, error)
GetInboundTrackersForChain(ctx context.Context, chainID int64) ([]crosschaintypes.InboundTracker, error)

PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error)
GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error)

Stop()
OnBeforeStop(callback func())
PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error)
}

// BTCRPCClient is the interface for BTC RPC client
Expand Down
Loading
Loading