From 0bc8952df0061e6af9834dde690379fb61e50c3b Mon Sep 17 00:00:00 2001 From: Tanmay Date: Fri, 19 Jul 2024 11:55:24 -0400 Subject: [PATCH 01/20] add background threads --- cmd/zetaclientd-supervisor/lib.go | 6 +- cmd/zetaclientd-supervisor/main.go | 14 +- cmd/zetaclientd/start.go | 24 ++- contrib/localnet/docker-compose.yml | 2 + pkg/bg/bg.go | 13 +- zetaclient/context/app.go | 4 + zetaclient/orchestrator/orchestrator.go | 6 +- zetaclient/zetacore/background_threads.go | 174 +++++++++++++++++++ zetaclient/zetacore/client_query_observer.go | 9 + 9 files changed, 237 insertions(+), 15 deletions(-) create mode 100644 zetaclient/zetacore/background_threads.go diff --git a/cmd/zetaclientd-supervisor/lib.go b/cmd/zetaclientd-supervisor/lib.go index 71f492e88b..4d71dcd573 100644 --- a/cmd/zetaclientd-supervisor/lib.go +++ b/cmd/zetaclientd-supervisor/lib.go @@ -99,9 +99,9 @@ func newZetaclientdSupervisor( func (s *zetaclientdSupervisor) Start(ctx context.Context) { go s.watchForVersionChanges(ctx) go s.handleCoreUpgradePlan(ctx) - go s.handleNewKeygen(ctx) - go s.handleNewTSSKeyGeneration(ctx) - go s.handleTSSUpdate(ctx) + //go s.handleNewKeygen(ctx) + //go s.handleNewTSSKeyGeneration(ctx) + //go s.handleTSSUpdate(ctx) } func (s *zetaclientdSupervisor) WaitForReloadSignal(ctx context.Context) { diff --git a/cmd/zetaclientd-supervisor/main.go b/cmd/zetaclientd-supervisor/main.go index ee1e247be4..b72a92a94f 100644 --- a/cmd/zetaclientd-supervisor/main.go +++ b/cmd/zetaclientd-supervisor/main.go @@ -69,10 +69,20 @@ func main() { cmd.Stdin = &passwordInputBuffer eg, ctx := errgroup.WithContext(ctx) - eg.Go(cmd.Run) + //eg.Go(cmd.Run) + eg.Go(func() error { + err := cmd.Run() + if err != nil { + logger.Error().Err(err).Msg("zetaclient process exited with error") + } else { + logger.Info().Msg("zetaclient process exited") + } + cancel() // Signal other goroutines to exit + return err + }) eg.Go(func() error { supervisor.WaitForReloadSignal(ctx) - cancel() + //cancel() return nil }) eg.Go(func() error { diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index fef035aee5..ab5c1262f6 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -21,8 +21,8 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/zeta-chain/go-tss/p2p" - "github.com/zeta-chain/zetacore/pkg/authz" + "github.com/zeta-chain/zetacore/pkg/bg" "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/pkg/constant" observerTypes "github.com/zeta-chain/zetacore/x/observer/types" @@ -210,6 +210,12 @@ func start(_ *cobra.Command, _ []string) error { // Set P2P ID for telemetry telemetryServer.SetP2PID(server.GetLocalPeerID()) + notifyCtx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("cancelling context on zeta-client exit")) + bg.Work(ctx, zetacoreClient.HandleTSSUpdate, bg.WithName("HandleTSSUpdate"), bg.WithLogger(masterLogger), bg.WithCancel(cancel)) + bg.Work(ctx, zetacoreClient.HandleNewKeygen, bg.WithName("HandleNewKeygen"), bg.WithLogger(masterLogger), bg.WithCancel(cancel)) + bg.Work(ctx, zetacoreClient.HandleNewTSSKeyGeneration, bg.WithName("HandleNewTSSKeyGeneration"), bg.WithLogger(masterLogger), bg.WithCancel(cancel)) + // Generate a new TSS if keygen is set and add it into the tss server // If TSS has already been generated, and keygen was successful ; we use the existing TSS err = GenerateTSS(ctx, masterLogger, zetacoreClient, server) @@ -326,6 +332,7 @@ func start(_ *cobra.Command, _ []string) error { masterLogger, telemetryServer, ) + err = orchestrator.MonitorCore(ctx) if err != nil { startLogger.Error().Err(err).Msg("Orchestrator failed to start") @@ -346,18 +353,23 @@ func start(_ *cobra.Command, _ []string) error { // defer zetaSupplyChecker.Stop() //} - startLogger.Info().Msgf("awaiting the os.Interrupt, syscall.SIGTERM signals...") ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - sig := <-ch - startLogger.Info().Msgf("stop signal received: %s", sig) - // stop chain observers + startLogger.Info().Msgf("awaiting the os.Interrupt, syscall.SIGTERM signals...") + select { + case <-notifyCtx.Done(): + cause := context.Cause(notifyCtx) + startLogger.Info().Msgf("shutdown signal received , cause : %s", cause) + case sig := <-ch: + startLogger.Info().Msgf("stop signal received: %s", sig) + } + + //stop chain observers for _, observer := range observerMap { observer.Stop() } zetacoreClient.Stop() - return nil } diff --git a/contrib/localnet/docker-compose.yml b/contrib/localnet/docker-compose.yml index 880b6de8ad..e89184f33f 100644 --- a/contrib/localnet/docker-compose.yml +++ b/contrib/localnet/docker-compose.yml @@ -124,6 +124,7 @@ services: - ETHDEV_ENDPOINT=http://eth:8545 - HOTKEY_BACKEND=file - HOTKEY_PASSWORD=password # test purposes only + restart: always volumes: - ssh:/root/.ssh - preparams:/root/preparams @@ -140,6 +141,7 @@ services: - ETHDEV_ENDPOINT=http://eth:8545 - HOTKEY_BACKEND=file - HOTKEY_PASSWORD=password # test purposes only + restart: always volumes: - ssh:/root/.ssh - preparams:/root/preparams diff --git a/pkg/bg/bg.go b/pkg/bg/bg.go index 85d85964cf..4b76ff8312 100644 --- a/pkg/bg/bg.go +++ b/pkg/bg/bg.go @@ -3,6 +3,7 @@ package bg import ( "context" + "errors" "fmt" "github.com/rs/zerolog" @@ -11,6 +12,7 @@ import ( type config struct { name string logger zerolog.Logger + cancel context.CancelCauseFunc } type Opt func(*config) @@ -19,6 +21,10 @@ func WithName(name string) Opt { return func(cfg *config) { cfg.name = name } } +func WithCancel(cancel context.CancelCauseFunc) Opt { + return func(cfg *config) { cfg.cancel = cancel } +} + func WithLogger(logger zerolog.Logger) Opt { return func(cfg *config) { cfg.logger = logger } } @@ -28,6 +34,7 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { cfg := config{ name: "", logger: zerolog.Nop(), + cancel: nil, } for _, opt := range opts { @@ -42,9 +49,13 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { } }() - if err := f(ctx); err != nil { + err := f(ctx) + if err != nil { logError(err, cfg) } + if cfg.cancel != nil && err == nil { + cfg.cancel(errors.New(fmt.Sprintf("function : %s triggered restart", cfg.name))) + } }() } diff --git a/zetaclient/context/app.go b/zetaclient/context/app.go index 4888443ea9..a18053854a 100644 --- a/zetaclient/context/app.go +++ b/zetaclient/context/app.go @@ -70,6 +70,10 @@ func (a *AppContext) Config() config.Config { return a.config } +func (a *AppContext) Logger() zerolog.Logger { + return a.logger +} + // GetBTCChainAndConfig returns btc chain and config if enabled func (a *AppContext) GetBTCChainAndConfig() (chains.Chain, config.BTCConfig, bool) { btcConfig, configEnabled := a.Config().GetBTCConfig() diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index 951c4146f6..cf277ca29e 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -116,9 +116,9 @@ func (oc *Orchestrator) MonitorCore(ctx context.Context) error { shutdownOrchestrator := func() { // now stop orchestrator and all observers close(oc.stop) - for _, c := range oc.observerMap { - c.Stop() - } + //for _, c := range oc.observerMap { + // c.Stop() + //} } oc.zetacoreClient.OnBeforeStop(shutdownOrchestrator) diff --git a/zetaclient/zetacore/background_threads.go b/zetaclient/zetacore/background_threads.go new file mode 100644 index 0000000000..060fdb013b --- /dev/null +++ b/zetaclient/zetacore/background_threads.go @@ -0,0 +1,174 @@ +package zetacore + +import ( + "context" + "fmt" + "time" + + "cosmossdk.io/errors" + "github.com/cenkalti/backoff/v4" + "github.com/zeta-chain/zetacore/pkg/retry" + observertypes "github.com/zeta-chain/zetacore/x/observer/types" + zctx "github.com/zeta-chain/zetacore/zetaclient/context" +) + +func (c *Client) HandleTSSUpdate(ctx context.Context) error { + app, err := zctx.FromContext(ctx) + if err != nil { + return errors.Wrap(err, "failed to get app context") + } + + logger := app.Logger().With().Str("module", "HandleTSSUpdate").Logger() + + bo := backoff.NewConstantBackOff(5 * time.Second) + backoff.WithMaxRetries(bo, 10) + + // Initial TSS retrieval + tss, err := retry.DoTypedWithBackoffAndRetry[observertypes.TSS](func() (observertypes.TSS, error) { + return c.GetTSS(ctx) + }, bo) + if err != nil { + logger.Warn().Err(err).Msg("unable to get initial tss") + return err + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + { + tssNew, err := retry.DoTypedWithBackoffAndRetry[observertypes.TSS](func() (observertypes.TSS, error) { + return c.GetTSS(ctx) + }, bo) + if err != nil { + logger.Warn().Err(err).Msg("unable to get new tss") + continue + } + + if tssNew.TssPubkey == tss.TssPubkey { + continue + } + tss = tssNew + logger.Info().Msgf("tss address is updated from %s to %s", tss.TssPubkey, tssNew.TssPubkey) + logger.Info().Msg("restarting zetaclient to update tss address") + return nil + } + case <-ctx.Done(): + { + return errors.Wrap(ctx.Err(), "context done") + } + } + } +} + +func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { + app, err := zctx.FromContext(ctx) + if err != nil { + return errors.Wrap(err, "failed to get app context") + } + + logger := app.Logger().With().Str("module", "HandleNewTSSKeyGeneration").Logger() + + bo := backoff.NewConstantBackOff(5 * time.Second) + backoff.WithMaxRetries(bo, 10) + + // Initial TSS retrieval + tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS](func() ([]observertypes.TSS, error) { + return c.GetTSSHistory(ctx) + }, bo) + if err != nil { + return errors.Wrap(err, "failed to get initial tss history") + } + tssLen := len(tssHistoricalList) + + fmt.Println("tssLen old: ", tssLen) + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + { + tssHistoricalListNew, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS](func() ([]observertypes.TSS, error) { + return c.GetTSSHistory(ctx) + }, bo) + if err != nil { + continue + } + tssLenUpdated := len(tssHistoricalListNew) + fmt.Println("tssLen updated: ", tssLenUpdated) + + if tssLenUpdated == tssLen { + continue + } + if tssLenUpdated < tssLen { + tssLen = tssLenUpdated + continue + } + logger.Info().Msgf("tss list updated from %d to %d", tssLen, tssLenUpdated) + tssLen = tssLenUpdated + logger.Info().Msg("restarting zetaclient to update tss list") + return nil + } + case <-ctx.Done(): + { + return errors.Wrap(ctx.Err(), "context done") + } + } + } +} +func (c *Client) HandleNewKeygen(ctx context.Context) error { + app, err := zctx.FromContext(ctx) + if err != nil { + return err + } + logger := app.Logger().With().Str("module", "HandleNewKeygen").Logger() + + bo := backoff.NewConstantBackOff(5 * time.Second) + backoff.WithMaxRetries(bo, 10) + + // Initial TSS retrieval + keygen, err := retry.DoTypedWithBackoffAndRetry[*observertypes.Keygen](func() (*observertypes.Keygen, error) { + return c.GetKeyGen(ctx) + }, bo) + if err != nil { + return errors.Wrap(err, "failed to get initial tss history") + } + + ticker := time.NewTicker(5 * time.Second) + + for { + select { + case <-ticker.C: + { + keygenUpdated, err := c.GetKeyGen(ctx) + if err != nil { + logger.Warn().Err(err).Msg("unable to get keygen") + continue + } + if keygenUpdated == nil { + logger.Warn().Err(err).Msg("keygen is nil") + continue + } + if keygenUpdated.Status != observertypes.KeygenStatus_PendingKeygen { + continue + } + + if keygen.BlockNumber == keygenUpdated.BlockNumber { + continue + } + + keygen = keygenUpdated + logger.Info().Msgf("got new keygen at block %d", keygen.BlockNumber) + return nil + } + case <-ctx.Done(): + { + return errors.Wrap(ctx.Err(), "context done") + } + } + } +} diff --git a/zetaclient/zetacore/client_query_observer.go b/zetaclient/zetacore/client_query_observer.go index da4fcbad7a..e14fae5e0c 100644 --- a/zetaclient/zetacore/client_query_observer.go +++ b/zetaclient/zetacore/client_query_observer.go @@ -167,6 +167,15 @@ func (c *Client) GetBTCTSSAddress(ctx context.Context, chainID int64) (string, e return resp.Btc, nil } +func (c *Client) GetTSS(ctx context.Context) (types.TSS, error) { + resp, err := c.client.observer.TSS(ctx, &types.QueryGetTSSRequest{}) + if err != nil { + return types.TSS{}, errors.Wrap(err, "failed to get tss") + } + + return resp.TSS, nil +} + // GetTSSHistory returns the TSS history func (c *Client) GetTSSHistory(ctx context.Context) ([]types.TSS, error) { resp, err := c.client.observer.TssHistory(ctx, &types.QueryTssHistoryRequest{}) From 3338f44fabf2d9fc449453f6b523a5f3e70003ab Mon Sep 17 00:00:00 2001 From: Tanmay Date: Tue, 23 Jul 2024 20:44:39 -0400 Subject: [PATCH 02/20] add cancel for child context --- Makefile | 2 +- cmd/zetaclientd/start.go | 21 ++++++++++++++----- zetaclient/zetacore/background_threads.go | 25 ++++++++--------------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 75bd9e9f92..21b1a24f5e 100644 --- a/Makefile +++ b/Makefile @@ -256,7 +256,7 @@ start-stress-test: zetanode start-tss-migration-test: zetanode @echo "--> Starting migration test" - export E2E_ARGS="--test-tss-migration" && \ + export E2E_ARGS="--test-tss-migration --skip-regular" && \ cd contrib/localnet/ && $(DOCKER) compose up -d ############################################################################### diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index ab5c1262f6..b3230e02c8 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -18,6 +18,7 @@ import ( "github.com/libp2p/go-libp2p/core" maddr "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/zeta-chain/go-tss/p2p" @@ -32,6 +33,7 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/metrics" "github.com/zeta-chain/zetacore/zetaclient/orchestrator" mc "github.com/zeta-chain/zetacore/zetaclient/tss" + "github.com/zeta-chain/zetacore/zetaclient/zetacore" ) type Multiaddr = core.Multiaddr @@ -210,11 +212,11 @@ func start(_ *cobra.Command, _ []string) error { // Set P2P ID for telemetry telemetryServer.SetP2PID(server.GetLocalPeerID()) + // Create a notification context for background threads.These threads are responsible for sending shutdown signals to the main thread. notifyCtx, cancel := context.WithCancelCause(ctx) - defer cancel(errors.New("cancelling context on zeta-client exit")) - bg.Work(ctx, zetacoreClient.HandleTSSUpdate, bg.WithName("HandleTSSUpdate"), bg.WithLogger(masterLogger), bg.WithCancel(cancel)) - bg.Work(ctx, zetacoreClient.HandleNewKeygen, bg.WithName("HandleNewKeygen"), bg.WithLogger(masterLogger), bg.WithCancel(cancel)) - bg.Work(ctx, zetacoreClient.HandleNewTSSKeyGeneration, bg.WithName("HandleNewTSSKeyGeneration"), bg.WithLogger(masterLogger), bg.WithCancel(cancel)) + + // Start background threads + defer startBackgroundThreads(notifyCtx, cancel, zetacoreClient, masterLogger) // Generate a new TSS if keygen is set and add it into the tss server // If TSS has already been generated, and keygen was successful ; we use the existing TSS @@ -361,10 +363,11 @@ func start(_ *cobra.Command, _ []string) error { case <-notifyCtx.Done(): cause := context.Cause(notifyCtx) startLogger.Info().Msgf("shutdown signal received , cause : %s", cause) + case sig := <-ch: startLogger.Info().Msgf("stop signal received: %s", sig) } - + //cancelBackgroundThreads() //stop chain observers for _, observer := range observerMap { observer.Stop() @@ -431,3 +434,11 @@ func promptPasswords() (string, string, error) { return hotKeyPass, TSSKeyPass, err } + +func startBackgroundThreads(ctx context.Context, cancelFunc context.CancelCauseFunc, client *zetacore.Client, masterLogger zerolog.Logger) context.CancelFunc { + backgroundContext, cancel := context.WithCancel(ctx) + bg.Work(backgroundContext, client.HandleTSSUpdate, bg.WithName("HandleTSSUpdate"), bg.WithLogger(masterLogger), bg.WithCancel(cancelFunc)) + bg.Work(backgroundContext, client.HandleNewKeygen, bg.WithName("HandleNewKeygen"), bg.WithLogger(masterLogger), bg.WithCancel(cancelFunc)) + bg.Work(backgroundContext, client.HandleNewTSSKeyGeneration, bg.WithName("HandleNewTSSKeyGeneration"), bg.WithLogger(masterLogger), bg.WithCancel(cancelFunc)) + return cancel +} diff --git a/zetaclient/zetacore/background_threads.go b/zetaclient/zetacore/background_threads.go index 060fdb013b..b2083eb5a5 100644 --- a/zetaclient/zetacore/background_threads.go +++ b/zetaclient/zetacore/background_threads.go @@ -2,7 +2,6 @@ package zetacore import ( "context" - "fmt" "time" "cosmossdk.io/errors" @@ -39,9 +38,7 @@ func (c *Client) HandleTSSUpdate(ctx context.Context) error { select { case <-ticker.C: { - tssNew, err := retry.DoTypedWithBackoffAndRetry[observertypes.TSS](func() (observertypes.TSS, error) { - return c.GetTSS(ctx) - }, bo) + tssNew, err := c.GetTSS(ctx) if err != nil { logger.Warn().Err(err).Msg("unable to get new tss") continue @@ -50,14 +47,14 @@ func (c *Client) HandleTSSUpdate(ctx context.Context) error { if tssNew.TssPubkey == tss.TssPubkey { continue } - tss = tssNew logger.Info().Msgf("tss address is updated from %s to %s", tss.TssPubkey, tssNew.TssPubkey) - logger.Info().Msg("restarting zetaclient to update tss address") + tss = tssNew return nil } case <-ctx.Done(): { - return errors.Wrap(ctx.Err(), "context done") + logger.Info().Msg("HandleTSSUpdate stopped") + return nil } } } @@ -83,8 +80,6 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { } tssLen := len(tssHistoricalList) - fmt.Println("tssLen old: ", tssLen) - ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -92,14 +87,11 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { select { case <-ticker.C: { - tssHistoricalListNew, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS](func() ([]observertypes.TSS, error) { - return c.GetTSSHistory(ctx) - }, bo) + tssHistoricalListNew, err := c.GetTSSHistory(ctx) if err != nil { continue } tssLenUpdated := len(tssHistoricalListNew) - fmt.Println("tssLen updated: ", tssLenUpdated) if tssLenUpdated == tssLen { continue @@ -110,12 +102,12 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { } logger.Info().Msgf("tss list updated from %d to %d", tssLen, tssLenUpdated) tssLen = tssLenUpdated - logger.Info().Msg("restarting zetaclient to update tss list") return nil } case <-ctx.Done(): { - return errors.Wrap(ctx.Err(), "context done") + logger.Info().Msg("HandleNewTSSKeyGeneration stopped") + return nil } } } @@ -167,7 +159,8 @@ func (c *Client) HandleNewKeygen(ctx context.Context) error { } case <-ctx.Done(): { - return errors.Wrap(ctx.Err(), "context done") + logger.Info().Msg("HandleNewKeygen stopped") + return nil } } } From 625259bfc61c75d2063d5ea8839de60862c32c8b Mon Sep 17 00:00:00 2001 From: Tanmay Date: Tue, 23 Jul 2024 21:08:26 -0400 Subject: [PATCH 03/20] add comments --- Makefile | 2 +- cmd/zetaclientd-supervisor/lib.go | 130 +--------------------- cmd/zetaclientd/start.go | 7 +- pkg/bg/bg.go | 6 +- zetaclient/zetacore/background_threads.go | 11 +- 5 files changed, 21 insertions(+), 135 deletions(-) diff --git a/Makefile b/Makefile index 21b1a24f5e..75bd9e9f92 100644 --- a/Makefile +++ b/Makefile @@ -256,7 +256,7 @@ start-stress-test: zetanode start-tss-migration-test: zetanode @echo "--> Starting migration test" - export E2E_ARGS="--test-tss-migration --skip-regular" && \ + export E2E_ARGS="--test-tss-migration" && \ cd contrib/localnet/ && $(DOCKER) compose up -d ############################################################################### diff --git a/cmd/zetaclientd-supervisor/lib.go b/cmd/zetaclientd-supervisor/lib.go index 4d71dcd573..15644cdc61 100644 --- a/cmd/zetaclientd-supervisor/lib.go +++ b/cmd/zetaclientd-supervisor/lib.go @@ -12,7 +12,6 @@ import ( "runtime" "strings" "sync" - "syscall" "time" "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" @@ -20,8 +19,8 @@ import ( "github.com/hashicorp/go-getter" "github.com/rs/zerolog" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" - observertypes "github.com/zeta-chain/zetacore/x/observer/types" "github.com/zeta-chain/zetacore/zetaclient/config" ) @@ -79,7 +78,7 @@ func newZetaclientdSupervisor( logger = logger.With().Str("module", "zetaclientdSupervisor").Logger() conn, err := grpc.Dial( fmt.Sprintf("%s:9090", zetaCoreURL), - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { return nil, fmt.Errorf("grpc dial: %w", err) @@ -177,125 +176,6 @@ func (s *zetaclientdSupervisor) watchForVersionChanges(ctx context.Context) { } } -func (s *zetaclientdSupervisor) handleTSSUpdate(ctx context.Context) { - maxRetries := 11 - retryInterval := 5 * time.Second - - // TODO : use retry library under pkg/retry - // https://github.com/zeta-chain/node/issues/2492 - for i := 0; i < maxRetries; i++ { - client := observertypes.NewQueryClient(s.zetacoredConn) - tss, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get original tss") - time.Sleep(retryInterval) - continue - } - i = 0 - for { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return - } - tssNew, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get tss") - continue - } - - if tssNew.TSS.TssPubkey == tss.TSS.TssPubkey { - continue - } - - tss = tssNew - s.logger.Info(). - Msgf("tss address is updated from %s to %s", tss.TSS.TssPubkey, tssNew.TSS.TssPubkey) - time.Sleep(6 * time.Second) - s.logger.Info().Msg("restarting zetaclientd to update tss address") - s.restartChan <- syscall.SIGHUP - } - } - s.logger.Warn().Msg("handleTSSUpdate exiting without success") -} - -func (s *zetaclientdSupervisor) handleNewTSSKeyGeneration(ctx context.Context) { - maxRetries := 11 - retryInterval := 5 * time.Second - - // TODO : use retry library under pkg/retry - for i := 0; i < maxRetries; i++ { - client := observertypes.NewQueryClient(s.zetacoredConn) - alltss, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get tss original history") - time.Sleep(retryInterval) - continue - } - i = 0 - tssLenCurrent := len(alltss.TssList) - for { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return - } - tssListNew, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get tss new history") - continue - } - tssLenUpdated := len(tssListNew.TssList) - - if tssLenUpdated == tssLenCurrent { - continue - } - if tssLenUpdated < tssLenCurrent { - tssLenCurrent = len(tssListNew.TssList) - continue - } - - tssLenCurrent = tssLenUpdated - s.logger.Info().Msgf("tss list updated from %d to %d", tssLenCurrent, tssLenUpdated) - time.Sleep(5 * time.Second) - s.logger.Info().Msg("restarting zetaclientd to update tss list") - s.restartChan <- syscall.SIGHUP - } - } - s.logger.Warn().Msg("handleNewTSSKeyGeneration exiting without success") -} - -func (s *zetaclientdSupervisor) handleNewKeygen(ctx context.Context) { - client := observertypes.NewQueryClient(s.zetacoredConn) - prevKeygenBlock := int64(0) - for { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return - } - resp, err := client.Keygen(ctx, &observertypes.QueryGetKeygenRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get keygen") - continue - } - if resp.Keygen == nil { - s.logger.Warn().Err(err).Msg("keygen is nil") - continue - } - - if resp.Keygen.Status != observertypes.KeygenStatus_PendingKeygen { - continue - } - keygenBlock := resp.Keygen.BlockNumber - if prevKeygenBlock == keygenBlock { - continue - } - prevKeygenBlock = keygenBlock - s.logger.Info().Msgf("got new keygen at block %d", keygenBlock) - s.restartChan <- syscall.SIGHUP - } -} func (s *zetaclientdSupervisor) handleCoreUpgradePlan(ctx context.Context) { client := upgradetypes.NewQueryClient(s.zetacoredConn) @@ -345,8 +225,8 @@ func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *u if plan.Info == "" { return errors.New("upgrade info empty") } - var config upgradeConfig - err := json.Unmarshal([]byte(plan.Info), &config) + var cfg upgradeConfig + err := json.Unmarshal([]byte(plan.Info), &cfg) if err != nil { return fmt.Errorf("unmarshal upgrade config: %w", err) } @@ -354,7 +234,7 @@ func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *u s.logger.Info().Msg("downloading zetaclientd") binKey := fmt.Sprintf("%s-%s/%s", zetaclientdBinaryName, runtime.GOOS, runtime.GOARCH) - binURL, ok := config.Binaries[binKey] + binURL, ok := cfg.Binaries[binKey] if !ok { return fmt.Errorf("no binary found for: %s", binKey) } diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index b3230e02c8..83a76ad80f 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -326,7 +326,7 @@ func start(_ *cobra.Command, _ []string) error { } // Orchestrator wraps the zetacore client and adds the observers and signer maps to it . This is the high level object used for CCTX interactions - orchestrator := orchestrator.NewOrchestrator( + cctxOrchestrator := orchestrator.NewOrchestrator( ctx, zetacoreClient, signerMap, @@ -335,7 +335,7 @@ func start(_ *cobra.Command, _ []string) error { telemetryServer, ) - err = orchestrator.MonitorCore(ctx) + err = cctxOrchestrator.MonitorCore(ctx) if err != nil { startLogger.Error().Err(err).Msg("Orchestrator failed to start") return err @@ -435,6 +435,9 @@ func promptPasswords() (string, string, error) { return hotKeyPass, TSSKeyPass, err } +// startBackgroundThreads: This function will start background threads. +// These threads are responsible for handling TSS updates, new keygen, and new TSS key generation. +// These threads are provided with a cancel function which is used to restart the main thread based on the outcome of the background task. func startBackgroundThreads(ctx context.Context, cancelFunc context.CancelCauseFunc, client *zetacore.Client, masterLogger zerolog.Logger) context.CancelFunc { backgroundContext, cancel := context.WithCancel(ctx) bg.Work(backgroundContext, client.HandleTSSUpdate, bg.WithName("HandleTSSUpdate"), bg.WithLogger(masterLogger), bg.WithCancel(cancelFunc)) diff --git a/pkg/bg/bg.go b/pkg/bg/bg.go index 4b76ff8312..27f36eef68 100644 --- a/pkg/bg/bg.go +++ b/pkg/bg/bg.go @@ -3,7 +3,6 @@ package bg import ( "context" - "errors" "fmt" "github.com/rs/zerolog" @@ -53,8 +52,11 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { if err != nil { logError(err, cfg) } + + // Use cancel function if it is provided. + // This is used for restarting the main thread based on the outcome of the background task if cfg.cancel != nil && err == nil { - cfg.cancel(errors.New(fmt.Sprintf("function : %s triggered restart", cfg.name))) + cfg.cancel(fmt.Errorf("function : %s triggered restart", cfg.name)) } }() } diff --git a/zetaclient/zetacore/background_threads.go b/zetaclient/zetacore/background_threads.go index b2083eb5a5..8e6cf916d2 100644 --- a/zetaclient/zetacore/background_threads.go +++ b/zetaclient/zetacore/background_threads.go @@ -11,6 +11,7 @@ import ( zctx "github.com/zeta-chain/zetacore/zetaclient/context" ) +// HandleTSSUpdate is a background thread that listens for TSS updates; it returns when the TSS address is updated func (c *Client) HandleTSSUpdate(ctx context.Context) error { app, err := zctx.FromContext(ctx) if err != nil { @@ -60,6 +61,8 @@ func (c *Client) HandleTSSUpdate(ctx context.Context) error { } } +// HandleNewTSSKeyGeneration is a background thread that listens for new TSS key generation; it returns when a new key is generated +// It uses the length of the TSS list to determine if a new key is generated func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { app, err := zctx.FromContext(ctx) if err != nil { @@ -93,11 +96,7 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { } tssLenUpdated := len(tssHistoricalListNew) - if tssLenUpdated == tssLen { - continue - } - if tssLenUpdated < tssLen { - tssLen = tssLenUpdated + if tssLenUpdated <= tssLen { continue } logger.Info().Msgf("tss list updated from %d to %d", tssLen, tssLenUpdated) @@ -112,6 +111,8 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { } } } + +// HandleNewKeygen is a background thread that listens for new keygen; it returns when a new keygen is set func (c *Client) HandleNewKeygen(ctx context.Context) error { app, err := zctx.FromContext(ctx) if err != nil { From ee16242f66f84191a27235805ccefdf3d15e1a2b Mon Sep 17 00:00:00 2001 From: Tanmay Date: Tue, 23 Jul 2024 21:22:29 -0400 Subject: [PATCH 04/20] remove commented code --- cmd/zetaclientd-supervisor/lib.go | 3 --- cmd/zetaclientd-supervisor/main.go | 2 -- cmd/zetaclientd/start.go | 4 ++-- zetaclient/orchestrator/orchestrator.go | 3 --- zetaclient/zetacore/background_threads.go | 4 ++-- zetaclient/zetacore/client_query_observer.go | 1 - 6 files changed, 4 insertions(+), 13 deletions(-) diff --git a/cmd/zetaclientd-supervisor/lib.go b/cmd/zetaclientd-supervisor/lib.go index 15644cdc61..e65782c6b1 100644 --- a/cmd/zetaclientd-supervisor/lib.go +++ b/cmd/zetaclientd-supervisor/lib.go @@ -98,9 +98,6 @@ func newZetaclientdSupervisor( func (s *zetaclientdSupervisor) Start(ctx context.Context) { go s.watchForVersionChanges(ctx) go s.handleCoreUpgradePlan(ctx) - //go s.handleNewKeygen(ctx) - //go s.handleNewTSSKeyGeneration(ctx) - //go s.handleTSSUpdate(ctx) } func (s *zetaclientdSupervisor) WaitForReloadSignal(ctx context.Context) { diff --git a/cmd/zetaclientd-supervisor/main.go b/cmd/zetaclientd-supervisor/main.go index b72a92a94f..ed5529dcdc 100644 --- a/cmd/zetaclientd-supervisor/main.go +++ b/cmd/zetaclientd-supervisor/main.go @@ -69,7 +69,6 @@ func main() { cmd.Stdin = &passwordInputBuffer eg, ctx := errgroup.WithContext(ctx) - //eg.Go(cmd.Run) eg.Go(func() error { err := cmd.Run() if err != nil { @@ -82,7 +81,6 @@ func main() { }) eg.Go(func() error { supervisor.WaitForReloadSignal(ctx) - //cancel() return nil }) eg.Go(func() error { diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 83a76ad80f..9c23161001 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -358,7 +358,7 @@ func start(_ *cobra.Command, _ []string) error { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - startLogger.Info().Msgf("awaiting the os.Interrupt, syscall.SIGTERM signals...") + startLogger.Info().Msgf("awaiting shutdown signals") select { case <-notifyCtx.Done(): cause := context.Cause(notifyCtx) @@ -367,7 +367,7 @@ func start(_ *cobra.Command, _ []string) error { case sig := <-ch: startLogger.Info().Msgf("stop signal received: %s", sig) } - //cancelBackgroundThreads() + //stop chain observers for _, observer := range observerMap { observer.Stop() diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index cf277ca29e..87f537cf0a 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -116,9 +116,6 @@ func (oc *Orchestrator) MonitorCore(ctx context.Context) error { shutdownOrchestrator := func() { // now stop orchestrator and all observers close(oc.stop) - //for _, c := range oc.observerMap { - // c.Stop() - //} } oc.zetacoreClient.OnBeforeStop(shutdownOrchestrator) diff --git a/zetaclient/zetacore/background_threads.go b/zetaclient/zetacore/background_threads.go index 8e6cf916d2..82b25b343e 100644 --- a/zetaclient/zetacore/background_threads.go +++ b/zetaclient/zetacore/background_threads.go @@ -74,7 +74,7 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { bo := backoff.NewConstantBackOff(5 * time.Second) backoff.WithMaxRetries(bo, 10) - // Initial TSS retrieval + // Initial TSS history retrieval tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS](func() ([]observertypes.TSS, error) { return c.GetTSSHistory(ctx) }, bo) @@ -123,7 +123,7 @@ func (c *Client) HandleNewKeygen(ctx context.Context) error { bo := backoff.NewConstantBackOff(5 * time.Second) backoff.WithMaxRetries(bo, 10) - // Initial TSS retrieval + // Initial Keygen retrieval keygen, err := retry.DoTypedWithBackoffAndRetry[*observertypes.Keygen](func() (*observertypes.Keygen, error) { return c.GetKeyGen(ctx) }, bo) diff --git a/zetaclient/zetacore/client_query_observer.go b/zetaclient/zetacore/client_query_observer.go index e14fae5e0c..ca59908358 100644 --- a/zetaclient/zetacore/client_query_observer.go +++ b/zetaclient/zetacore/client_query_observer.go @@ -172,7 +172,6 @@ func (c *Client) GetTSS(ctx context.Context) (types.TSS, error) { if err != nil { return types.TSS{}, errors.Wrap(err, "failed to get tss") } - return resp.TSS, nil } From a0e0df516d6c9b90aa4055e7dca1fd35eecf49e3 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Tue, 23 Jul 2024 21:25:23 -0400 Subject: [PATCH 05/20] add default constant backoff --- pkg/retry/retry.go | 5 +++++ zetaclient/zetacore/background_threads.go | 16 +++------------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 291ceafe23..d3001691d6 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -46,6 +46,11 @@ func DefaultBackoff() Backoff { return backoff.WithMaxRetries(bo, 5) } +func DefaultConstantBackoff() Backoff { + bo := backoff.NewConstantBackOff(5 * time.Second) + return backoff.WithMaxRetries(bo, 10) +} + // Do executes the callback function with the default backoff config. // It will retry a callback ONLY if error is retryable. func Do(cb Callback) error { diff --git a/zetaclient/zetacore/background_threads.go b/zetaclient/zetacore/background_threads.go index 82b25b343e..7eb658cbb6 100644 --- a/zetaclient/zetacore/background_threads.go +++ b/zetaclient/zetacore/background_threads.go @@ -5,7 +5,6 @@ import ( "time" "cosmossdk.io/errors" - "github.com/cenkalti/backoff/v4" "github.com/zeta-chain/zetacore/pkg/retry" observertypes "github.com/zeta-chain/zetacore/x/observer/types" zctx "github.com/zeta-chain/zetacore/zetaclient/context" @@ -20,13 +19,10 @@ func (c *Client) HandleTSSUpdate(ctx context.Context) error { logger := app.Logger().With().Str("module", "HandleTSSUpdate").Logger() - bo := backoff.NewConstantBackOff(5 * time.Second) - backoff.WithMaxRetries(bo, 10) - // Initial TSS retrieval tss, err := retry.DoTypedWithBackoffAndRetry[observertypes.TSS](func() (observertypes.TSS, error) { return c.GetTSS(ctx) - }, bo) + }, retry.DefaultConstantBackoff()) if err != nil { logger.Warn().Err(err).Msg("unable to get initial tss") return err @@ -71,13 +67,10 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { logger := app.Logger().With().Str("module", "HandleNewTSSKeyGeneration").Logger() - bo := backoff.NewConstantBackOff(5 * time.Second) - backoff.WithMaxRetries(bo, 10) - // Initial TSS history retrieval tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS](func() ([]observertypes.TSS, error) { return c.GetTSSHistory(ctx) - }, bo) + }, retry.DefaultConstantBackoff()) if err != nil { return errors.Wrap(err, "failed to get initial tss history") } @@ -120,13 +113,10 @@ func (c *Client) HandleNewKeygen(ctx context.Context) error { } logger := app.Logger().With().Str("module", "HandleNewKeygen").Logger() - bo := backoff.NewConstantBackOff(5 * time.Second) - backoff.WithMaxRetries(bo, 10) - // Initial Keygen retrieval keygen, err := retry.DoTypedWithBackoffAndRetry[*observertypes.Keygen](func() (*observertypes.Keygen, error) { return c.GetKeyGen(ctx) - }, bo) + }, retry.DefaultConstantBackoff()) if err != nil { return errors.Wrap(err, "failed to get initial tss history") } From 731b03e07cca68da70cac062a64b0c921d8cb030 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Tue, 23 Jul 2024 21:29:37 -0400 Subject: [PATCH 06/20] generate files --- cmd/zetaclientd/start.go | 32 ++++++++++++++++++++--- zetaclient/zetacore/background_threads.go | 10 ++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 9c23161001..52f88264aa 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -22,6 +22,7 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/zeta-chain/go-tss/p2p" + "github.com/zeta-chain/zetacore/pkg/authz" "github.com/zeta-chain/zetacore/pkg/bg" "github.com/zeta-chain/zetacore/pkg/chains" @@ -438,10 +439,33 @@ func promptPasswords() (string, string, error) { // startBackgroundThreads: This function will start background threads. // These threads are responsible for handling TSS updates, new keygen, and new TSS key generation. // These threads are provided with a cancel function which is used to restart the main thread based on the outcome of the background task. -func startBackgroundThreads(ctx context.Context, cancelFunc context.CancelCauseFunc, client *zetacore.Client, masterLogger zerolog.Logger) context.CancelFunc { +func startBackgroundThreads( + ctx context.Context, + cancelFunc context.CancelCauseFunc, + client *zetacore.Client, + masterLogger zerolog.Logger, +) context.CancelFunc { backgroundContext, cancel := context.WithCancel(ctx) - bg.Work(backgroundContext, client.HandleTSSUpdate, bg.WithName("HandleTSSUpdate"), bg.WithLogger(masterLogger), bg.WithCancel(cancelFunc)) - bg.Work(backgroundContext, client.HandleNewKeygen, bg.WithName("HandleNewKeygen"), bg.WithLogger(masterLogger), bg.WithCancel(cancelFunc)) - bg.Work(backgroundContext, client.HandleNewTSSKeyGeneration, bg.WithName("HandleNewTSSKeyGeneration"), bg.WithLogger(masterLogger), bg.WithCancel(cancelFunc)) + bg.Work( + backgroundContext, + client.HandleTSSUpdate, + bg.WithName("HandleTSSUpdate"), + bg.WithLogger(masterLogger), + bg.WithCancel(cancelFunc), + ) + bg.Work( + backgroundContext, + client.HandleNewKeygen, + bg.WithName("HandleNewKeygen"), + bg.WithLogger(masterLogger), + bg.WithCancel(cancelFunc), + ) + bg.Work( + backgroundContext, + client.HandleNewTSSKeyGeneration, + bg.WithName("HandleNewTSSKeyGeneration"), + bg.WithLogger(masterLogger), + bg.WithCancel(cancelFunc), + ) return cancel } diff --git a/zetaclient/zetacore/background_threads.go b/zetaclient/zetacore/background_threads.go index 7eb658cbb6..cf636c0579 100644 --- a/zetaclient/zetacore/background_threads.go +++ b/zetaclient/zetacore/background_threads.go @@ -5,6 +5,7 @@ import ( "time" "cosmossdk.io/errors" + "github.com/zeta-chain/zetacore/pkg/retry" observertypes "github.com/zeta-chain/zetacore/x/observer/types" zctx "github.com/zeta-chain/zetacore/zetaclient/context" @@ -68,9 +69,12 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { logger := app.Logger().With().Str("module", "HandleNewTSSKeyGeneration").Logger() // Initial TSS history retrieval - tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS](func() ([]observertypes.TSS, error) { - return c.GetTSSHistory(ctx) - }, retry.DefaultConstantBackoff()) + tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS]( + func() ([]observertypes.TSS, error) { + return c.GetTSSHistory(ctx) + }, + retry.DefaultConstantBackoff(), + ) if err != nil { return errors.Wrap(err, "failed to get initial tss history") } From b62938a0a7be294dae44560940fbadc253510b4d Mon Sep 17 00:00:00 2001 From: Tanmay Date: Wed, 24 Jul 2024 09:32:56 -0400 Subject: [PATCH 07/20] resolve comments 1 --- cmd/zetaclientd-supervisor/main.go | 12 ++--- cmd/zetaclientd/start.go | 44 ++-------------- pkg/bg/bg.go | 7 ++- ...ds.go => client_tss_migration_listners.go} | 51 +++++++++++++++---- 4 files changed, 54 insertions(+), 60 deletions(-) rename zetaclient/zetacore/{background_threads.go => client_tss_migration_listners.go} (72%) diff --git a/cmd/zetaclientd-supervisor/main.go b/cmd/zetaclientd-supervisor/main.go index ed5529dcdc..35e8bef75b 100644 --- a/cmd/zetaclientd-supervisor/main.go +++ b/cmd/zetaclientd-supervisor/main.go @@ -70,14 +70,14 @@ func main() { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - err := cmd.Run() - if err != nil { + defer cancel() + if err := cmd.Run(); err != nil { logger.Error().Err(err).Msg("zetaclient process exited with error") - } else { - logger.Info().Msg("zetaclient process exited") + return err } - cancel() // Signal other goroutines to exit - return err + + logger.Info().Msg("zetaclient process exited") + return nil }) eg.Go(func() error { supervisor.WaitForReloadSignal(ctx) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 52f88264aa..42a559cf99 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -18,13 +18,11 @@ import ( "github.com/libp2p/go-libp2p/core" maddr "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/zeta-chain/go-tss/p2p" "github.com/zeta-chain/zetacore/pkg/authz" - "github.com/zeta-chain/zetacore/pkg/bg" "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/pkg/constant" observerTypes "github.com/zeta-chain/zetacore/x/observer/types" @@ -34,7 +32,6 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/metrics" "github.com/zeta-chain/zetacore/zetaclient/orchestrator" mc "github.com/zeta-chain/zetacore/zetaclient/tss" - "github.com/zeta-chain/zetacore/zetaclient/zetacore" ) type Multiaddr = core.Multiaddr @@ -213,11 +210,12 @@ func start(_ *cobra.Command, _ []string) error { // Set P2P ID for telemetry telemetryServer.SetP2PID(server.GetLocalPeerID()) - // Create a notification context for background threads.These threads are responsible for sending shutdown signals to the main thread. + // Create a notification context for background threads. + // These threads are responsible for sending shutdown signals to the main thread. notifyCtx, cancel := context.WithCancelCause(ctx) - // Start background threads - defer startBackgroundThreads(notifyCtx, cancel, zetacoreClient, masterLogger) + // Start background threads which monitors zeta core for state changes related to TSS migration + defer zetacoreClient.StartTssMigrationRoutines(notifyCtx, cancel, masterLogger) // Generate a new TSS if keygen is set and add it into the tss server // If TSS has already been generated, and keygen was successful ; we use the existing TSS @@ -435,37 +433,3 @@ func promptPasswords() (string, string, error) { return hotKeyPass, TSSKeyPass, err } - -// startBackgroundThreads: This function will start background threads. -// These threads are responsible for handling TSS updates, new keygen, and new TSS key generation. -// These threads are provided with a cancel function which is used to restart the main thread based on the outcome of the background task. -func startBackgroundThreads( - ctx context.Context, - cancelFunc context.CancelCauseFunc, - client *zetacore.Client, - masterLogger zerolog.Logger, -) context.CancelFunc { - backgroundContext, cancel := context.WithCancel(ctx) - bg.Work( - backgroundContext, - client.HandleTSSUpdate, - bg.WithName("HandleTSSUpdate"), - bg.WithLogger(masterLogger), - bg.WithCancel(cancelFunc), - ) - bg.Work( - backgroundContext, - client.HandleNewKeygen, - bg.WithName("HandleNewKeygen"), - bg.WithLogger(masterLogger), - bg.WithCancel(cancelFunc), - ) - bg.Work( - backgroundContext, - client.HandleNewTSSKeyGeneration, - bg.WithName("HandleNewTSSKeyGeneration"), - bg.WithLogger(masterLogger), - bg.WithCancel(cancelFunc), - ) - return cancel -} diff --git a/pkg/bg/bg.go b/pkg/bg/bg.go index 27f36eef68..20e5eca1fa 100644 --- a/pkg/bg/bg.go +++ b/pkg/bg/bg.go @@ -52,11 +52,10 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { if err != nil { logError(err, cfg) } - // Use cancel function if it is provided. - // This is used for restarting the main thread based on the outcome of the background task - if cfg.cancel != nil && err == nil { - cfg.cancel(fmt.Errorf("function : %s triggered restart", cfg.name)) + // This is used for stopping the main thread based on the outcome of the background task. + if cfg.cancel != nil { + cfg.cancel(fmt.Errorf("cancel function triggered by %s", cfg.name)) } }() } diff --git a/zetaclient/zetacore/background_threads.go b/zetaclient/zetacore/client_tss_migration_listners.go similarity index 72% rename from zetaclient/zetacore/background_threads.go rename to zetaclient/zetacore/client_tss_migration_listners.go index cf636c0579..cb106cbdd1 100644 --- a/zetaclient/zetacore/background_threads.go +++ b/zetaclient/zetacore/client_tss_migration_listners.go @@ -5,20 +5,54 @@ import ( "time" "cosmossdk.io/errors" - + "github.com/rs/zerolog" + "github.com/zeta-chain/zetacore/pkg/bg" "github.com/zeta-chain/zetacore/pkg/retry" observertypes "github.com/zeta-chain/zetacore/x/observer/types" zctx "github.com/zeta-chain/zetacore/zetaclient/context" ) +// startBackgroundRoutines: This function will start background threads. +// These threads are responsible for handling TSS updates, new keygen, and new TSS key generation. +// These threads are provided with a cancel function which is used to restart the main thread based on the outcome of the background task. +func (c *Client) StartTssMigrationRoutines( + ctx context.Context, + cancelFunc context.CancelCauseFunc, + masterLogger zerolog.Logger, +) context.CancelFunc { + backgroundContext, cancel := context.WithCancel(ctx) + bg.Work( + backgroundContext, + c.HandleTSSUpdate, + bg.WithName("HandleTSSUpdate"), + bg.WithLogger(masterLogger), + bg.WithCancel(cancelFunc), + ) + bg.Work( + backgroundContext, + c.HandleNewKeygen, + bg.WithName("HandleNewKeygen"), + bg.WithLogger(masterLogger), + bg.WithCancel(cancelFunc), + ) + bg.Work( + backgroundContext, + c.HandleNewTSSKeyGeneration, + bg.WithName("HandleNewTSSKeyGeneration"), + bg.WithLogger(masterLogger), + bg.WithCancel(cancelFunc), + ) + return cancel +} + // HandleTSSUpdate is a background thread that listens for TSS updates; it returns when the TSS address is updated func (c *Client) HandleTSSUpdate(ctx context.Context) error { - app, err := zctx.FromContext(ctx) + appCtx, err := zctx.FromContext(ctx) if err != nil { return errors.Wrap(err, "failed to get app context") } - logger := app.Logger().With().Str("module", "HandleTSSUpdate").Logger() + logger := appCtx.Logger().With().Str("module", "HandleTSSUpdate").Logger() // Initial TSS retrieval tss, err := retry.DoTypedWithBackoffAndRetry[observertypes.TSS](func() (observertypes.TSS, error) { @@ -46,7 +80,6 @@ func (c *Client) HandleTSSUpdate(ctx context.Context) error { continue } logger.Info().Msgf("tss address is updated from %s to %s", tss.TssPubkey, tssNew.TssPubkey) - tss = tssNew return nil } case <-ctx.Done(): @@ -61,12 +94,12 @@ func (c *Client) HandleTSSUpdate(ctx context.Context) error { // HandleNewTSSKeyGeneration is a background thread that listens for new TSS key generation; it returns when a new key is generated // It uses the length of the TSS list to determine if a new key is generated func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { - app, err := zctx.FromContext(ctx) + appCtx, err := zctx.FromContext(ctx) if err != nil { return errors.Wrap(err, "failed to get app context") } - logger := app.Logger().With().Str("module", "HandleNewTSSKeyGeneration").Logger() + logger := appCtx.Logger().With().Str("module", "HandleNewTSSKeyGeneration").Logger() // Initial TSS history retrieval tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS]( @@ -97,7 +130,6 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { continue } logger.Info().Msgf("tss list updated from %d to %d", tssLen, tssLenUpdated) - tssLen = tssLenUpdated return nil } case <-ctx.Done(): @@ -111,11 +143,11 @@ func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { // HandleNewKeygen is a background thread that listens for new keygen; it returns when a new keygen is set func (c *Client) HandleNewKeygen(ctx context.Context) error { - app, err := zctx.FromContext(ctx) + appCtx, err := zctx.FromContext(ctx) if err != nil { return err } - logger := app.Logger().With().Str("module", "HandleNewKeygen").Logger() + logger := appCtx.Logger().With().Str("module", "HandleNewKeygen").Logger() // Initial Keygen retrieval keygen, err := retry.DoTypedWithBackoffAndRetry[*observertypes.Keygen](func() (*observertypes.Keygen, error) { @@ -148,7 +180,6 @@ func (c *Client) HandleNewKeygen(ctx context.Context) error { continue } - keygen = keygenUpdated logger.Info().Msgf("got new keygen at block %d", keygen.BlockNumber) return nil } From 2cce32c2fefc3ef7be37c797a47d3a6cd9a51905 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Wed, 24 Jul 2024 09:36:24 -0400 Subject: [PATCH 08/20] resolve comments 1 --- cmd/zetaclientd/start.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 42a559cf99..c6101bf2db 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -215,7 +215,8 @@ func start(_ *cobra.Command, _ []string) error { notifyCtx, cancel := context.WithCancelCause(ctx) // Start background threads which monitors zeta core for state changes related to TSS migration - defer zetacoreClient.StartTssMigrationRoutines(notifyCtx, cancel, masterLogger) + cancelBackgroundThreads := zetacoreClient.StartTssMigrationRoutines(notifyCtx, cancel, masterLogger) + defer cancelBackgroundThreads() // Generate a new TSS if keygen is set and add it into the tss server // If TSS has already been generated, and keygen was successful ; we use the existing TSS From 31e854c506c0b6ef0e9b12cff4b7671b669bcf73 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Wed, 24 Jul 2024 10:50:43 -0400 Subject: [PATCH 09/20] resolve comments 2 --- cmd/zetaclientd-supervisor/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/zetaclientd-supervisor/main.go b/cmd/zetaclientd-supervisor/main.go index 35e8bef75b..d7179d6948 100644 --- a/cmd/zetaclientd-supervisor/main.go +++ b/cmd/zetaclientd-supervisor/main.go @@ -10,6 +10,7 @@ import ( "syscall" "time" + "cosmossdk.io/errors" "golang.org/x/sync/errgroup" "github.com/zeta-chain/zetacore/app" @@ -72,8 +73,7 @@ func main() { eg.Go(func() error { defer cancel() if err := cmd.Run(); err != nil { - logger.Error().Err(err).Msg("zetaclient process exited with error") - return err + return errors.Wrap(err, "zetaclient process failed") } logger.Info().Msg("zetaclient process exited") From cae80f2212cba3582bd1a0e34836a8254727b2d5 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Wed, 24 Jul 2024 14:17:14 -0400 Subject: [PATCH 10/20] rename to callback to clarify terminiology --- pkg/bg/bg.go | 20 +++++++++---------- .../zetacore/client_tss_migration_listners.go | 14 ++++++------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/bg/bg.go b/pkg/bg/bg.go index 20e5eca1fa..1dafb26219 100644 --- a/pkg/bg/bg.go +++ b/pkg/bg/bg.go @@ -9,9 +9,9 @@ import ( ) type config struct { - name string - logger zerolog.Logger - cancel context.CancelCauseFunc + name string + logger zerolog.Logger + callback context.CancelCauseFunc } type Opt func(*config) @@ -20,8 +20,8 @@ func WithName(name string) Opt { return func(cfg *config) { cfg.name = name } } -func WithCancel(cancel context.CancelCauseFunc) Opt { - return func(cfg *config) { cfg.cancel = cancel } +func WithCallback(cancel context.CancelCauseFunc) Opt { + return func(cfg *config) { cfg.callback = cancel } } func WithLogger(logger zerolog.Logger) Opt { @@ -31,9 +31,9 @@ func WithLogger(logger zerolog.Logger) Opt { // Work emits a new task in the background func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { cfg := config{ - name: "", - logger: zerolog.Nop(), - cancel: nil, + name: "", + logger: zerolog.Nop(), + callback: nil, } for _, opt := range opts { @@ -54,8 +54,8 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { } // Use cancel function if it is provided. // This is used for stopping the main thread based on the outcome of the background task. - if cfg.cancel != nil { - cfg.cancel(fmt.Errorf("cancel function triggered by %s", cfg.name)) + if cfg.callback != nil { + cfg.callback(fmt.Errorf("callback function triggered for %s", cfg.name)) } }() } diff --git a/zetaclient/zetacore/client_tss_migration_listners.go b/zetaclient/zetacore/client_tss_migration_listners.go index cb106cbdd1..915c4fceaa 100644 --- a/zetaclient/zetacore/client_tss_migration_listners.go +++ b/zetaclient/zetacore/client_tss_migration_listners.go @@ -20,27 +20,27 @@ func (c *Client) StartTssMigrationRoutines( cancelFunc context.CancelCauseFunc, masterLogger zerolog.Logger, ) context.CancelFunc { - backgroundContext, cancel := context.WithCancel(ctx) + migrationRoutinesContext, cancel := context.WithCancel(ctx) bg.Work( - backgroundContext, + migrationRoutinesContext, c.HandleTSSUpdate, bg.WithName("HandleTSSUpdate"), bg.WithLogger(masterLogger), - bg.WithCancel(cancelFunc), + bg.WithCallback(cancelFunc), ) bg.Work( - backgroundContext, + migrationRoutinesContext, c.HandleNewKeygen, bg.WithName("HandleNewKeygen"), bg.WithLogger(masterLogger), - bg.WithCancel(cancelFunc), + bg.WithCallback(cancelFunc), ) bg.Work( - backgroundContext, + migrationRoutinesContext, c.HandleNewTSSKeyGeneration, bg.WithName("HandleNewTSSKeyGeneration"), bg.WithLogger(masterLogger), - bg.WithCancel(cancelFunc), + bg.WithCallback(cancelFunc), ) return cancel } From 6834fa806db4a990ac37936635aff45951dcc65d Mon Sep 17 00:00:00 2001 From: Tanmay Date: Wed, 24 Jul 2024 19:36:15 -0400 Subject: [PATCH 11/20] remove cancel cause --- cmd/zetaclientd/start.go | 16 +++++----------- pkg/bg/bg.go | 7 ++++--- .../zetacore/client_tss_migration_listners.go | 2 +- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index c6101bf2db..b98b4738f1 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -212,7 +212,7 @@ func start(_ *cobra.Command, _ []string) error { // Create a notification context for background threads. // These threads are responsible for sending shutdown signals to the main thread. - notifyCtx, cancel := context.WithCancelCause(ctx) + notifyCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) // Start background threads which monitors zeta core for state changes related to TSS migration cancelBackgroundThreads := zetacoreClient.StartTssMigrationRoutines(notifyCtx, cancel, masterLogger) @@ -355,24 +355,18 @@ func start(_ *cobra.Command, _ []string) error { // defer zetaSupplyChecker.Stop() //} - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + startLogger.Info().Msgf("zetaclientd is running") - startLogger.Info().Msgf("awaiting shutdown signals") - select { - case <-notifyCtx.Done(): - cause := context.Cause(notifyCtx) - startLogger.Info().Msgf("shutdown signal received , cause : %s", cause) + <-notifyCtx.Done() - case sig := <-ch: - startLogger.Info().Msgf("stop signal received: %s", sig) - } + startLogger.Info().Msgf("initiating zetaclientd shut down") //stop chain observers for _, observer := range observerMap { observer.Stop() } zetacoreClient.Stop() + startLogger.Info().Msgf("zetaclientd stopped") return nil } diff --git a/pkg/bg/bg.go b/pkg/bg/bg.go index 1dafb26219..cacc9f2e8f 100644 --- a/pkg/bg/bg.go +++ b/pkg/bg/bg.go @@ -11,7 +11,7 @@ import ( type config struct { name string logger zerolog.Logger - callback context.CancelCauseFunc + callback context.CancelFunc } type Opt func(*config) @@ -20,7 +20,7 @@ func WithName(name string) Opt { return func(cfg *config) { cfg.name = name } } -func WithCallback(cancel context.CancelCauseFunc) Opt { +func WithCallback(cancel context.CancelFunc) Opt { return func(cfg *config) { cfg.callback = cancel } } @@ -55,7 +55,8 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { // Use cancel function if it is provided. // This is used for stopping the main thread based on the outcome of the background task. if cfg.callback != nil { - cfg.callback(fmt.Errorf("callback function triggered for %s", cfg.name)) + cfg.logger.Info().Msgf("background task completed for %s", cfg.name) + cfg.callback() } }() } diff --git a/zetaclient/zetacore/client_tss_migration_listners.go b/zetaclient/zetacore/client_tss_migration_listners.go index 915c4fceaa..0fb2812e07 100644 --- a/zetaclient/zetacore/client_tss_migration_listners.go +++ b/zetaclient/zetacore/client_tss_migration_listners.go @@ -17,7 +17,7 @@ import ( // These threads are provided with a cancel function which is used to restart the main thread based on the outcome of the background task. func (c *Client) StartTssMigrationRoutines( ctx context.Context, - cancelFunc context.CancelCauseFunc, + cancelFunc context.CancelFunc, masterLogger zerolog.Logger, ) context.CancelFunc { migrationRoutinesContext, cancel := context.WithCancel(ctx) From 6b058e2de7ac7bf11d4507ef2ed267bc56ec889f Mon Sep 17 00:00:00 2001 From: Tanmay Date: Sun, 28 Jul 2024 10:09:28 -0400 Subject: [PATCH 12/20] generate files --- changelog.md | 1 + zetaclient/zetacore/client_tss_migration_listners.go | 1 + 2 files changed, 2 insertions(+) diff --git a/changelog.md b/changelog.md index c34e6dbca1..124538631e 100644 --- a/changelog.md +++ b/changelog.md @@ -36,6 +36,7 @@ * [2518](https://github.com/zeta-chain/node/pull/2518) - add support for Solana address in zetacore * [2483](https://github.com/zeta-chain/node/pull/2483) - add priorityFee (gasTipCap) gas to the state * [2567](https://github.com/zeta-chain/node/pull/2567) - add sign latency metric to zetaclient (zetaclient_sign_latency) +* [2538](https://github.com/zeta-chain/node/pull/2538) - add background worker to shutdown zetaclientd when needed for tss migration ### Refactor diff --git a/zetaclient/zetacore/client_tss_migration_listners.go b/zetaclient/zetacore/client_tss_migration_listners.go index 0fb2812e07..ffb59ad87b 100644 --- a/zetaclient/zetacore/client_tss_migration_listners.go +++ b/zetaclient/zetacore/client_tss_migration_listners.go @@ -6,6 +6,7 @@ import ( "cosmossdk.io/errors" "github.com/rs/zerolog" + "github.com/zeta-chain/zetacore/pkg/bg" "github.com/zeta-chain/zetacore/pkg/retry" observertypes "github.com/zeta-chain/zetacore/x/observer/types" From 0c30b3d0c640eedf4d45666a541f16ee44b4e174 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Wed, 31 Jul 2024 11:26:39 -0400 Subject: [PATCH 13/20] move changelog to unreleased --- changelog.md | 4 +++- cmd/zetaclientd/start.go | 2 +- zetaclient/zetacore/client.go | 2 +- zetaclient/zetacore/client_query_observer.go | 17 ++++------------- zetaclient/zetacore/client_query_test.go | 4 ++-- 5 files changed, 11 insertions(+), 18 deletions(-) diff --git a/changelog.md b/changelog.md index 124538631e..0d282312a4 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,9 @@ ## Unreleased +### Features +* [2538](https://github.com/zeta-chain/node/pull/2538) - add background worker routines to shutdown zetaclientd when needed for tss migration + ### Breaking Changes * [2460](https://github.com/zeta-chain/node/pull/2460) - Upgrade to go 1.22. This required us to temporarily remove the QUIC backend from [go-libp2p](https://github.com/libp2p/go-libp2p). If you are a zetaclient operator and have configured quic peers, you need to switch to tcp peers. @@ -36,7 +39,6 @@ * [2518](https://github.com/zeta-chain/node/pull/2518) - add support for Solana address in zetacore * [2483](https://github.com/zeta-chain/node/pull/2483) - add priorityFee (gasTipCap) gas to the state * [2567](https://github.com/zeta-chain/node/pull/2567) - add sign latency metric to zetaclient (zetaclient_sign_latency) -* [2538](https://github.com/zeta-chain/node/pull/2538) - add background worker to shutdown zetaclientd when needed for tss migration ### Refactor diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index b9c4d6ce3d..3e53d20ca4 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -261,7 +261,7 @@ func start(_ *cobra.Command, _ []string) error { // Update Current TSS value from zetacore, if TSS keygen is successful, the TSS address is set on zeta-core // Returns err if the RPC call fails as zeta client needs the current TSS address to be set // This is only needed in case of a new Keygen , as the TSS address is set on zetacore only after the keygen is successful i.e enough votes have been broadcast - currentTss, err := zetacoreClient.GetCurrentTSS(ctx) + currentTss, err := zetacoreClient.GetTSS(ctx) if err != nil { startLogger.Error().Err(err).Msg("GetCurrentTSS error") return err diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index 8789dce26b..7a90491598 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -415,7 +415,7 @@ func (c *Client) UpdateAppContext( return fmt.Errorf("failed to get keygen: %w", err) } - tss, err := c.GetCurrentTSS(ctx) + tss, err := c.GetTSS(ctx) if err != nil { c.logger.Info().Err(err).Msg("Unable to fetch TSS from zetacore") return fmt.Errorf("failed to get current tss: %w", err) diff --git a/zetaclient/zetacore/client_query_observer.go b/zetaclient/zetacore/client_query_observer.go index e6ce5520e6..6e883c9246 100644 --- a/zetaclient/zetacore/client_query_observer.go +++ b/zetaclient/zetacore/client_query_observer.go @@ -136,17 +136,7 @@ func (c *Client) GetBallot( return resp, nil } -// GetCurrentTSS returns the current TSS -func (c *Client) GetCurrentTSS(ctx context.Context) (types.TSS, error) { - resp, err := c.client.observer.TSS(ctx, &types.QueryGetTSSRequest{}) - if err != nil { - return types.TSS{}, errors.Wrap(err, "failed to get current tss") - } - - return resp.TSS, nil -} - -// GetEVMTSSAddress returns the EVM TSS address. +// GetEVMTSSAddress returns the current EVM TSS address. func (c *Client) GetEVMTSSAddress(ctx context.Context) (string, error) { resp, err := c.client.observer.GetTssAddress(ctx, &types.QueryGetTssAddressRequest{}) if err != nil { @@ -156,7 +146,7 @@ func (c *Client) GetEVMTSSAddress(ctx context.Context) (string, error) { return resp.Eth, nil } -// GetBTCTSSAddress returns the BTC TSS address +// GetBTCTSSAddress returns the current BTC TSS address func (c *Client) GetBTCTSSAddress(ctx context.Context, chainID int64) (string, error) { in := &types.QueryGetTssAddressRequest{BitcoinChainId: chainID} @@ -167,6 +157,7 @@ func (c *Client) GetBTCTSSAddress(ctx context.Context, chainID int64) (string, e return resp.Btc, nil } +// GetTSS returns the current TSS func (c *Client) GetTSS(ctx context.Context) (types.TSS, error) { resp, err := c.client.observer.TSS(ctx, &types.QueryGetTSSRequest{}) if err != nil { @@ -175,7 +166,7 @@ func (c *Client) GetTSS(ctx context.Context) (types.TSS, error) { return resp.TSS, nil } -// GetTSSHistory returns the TSS history +// GetTSSHistory returns the historical list of TSS func (c *Client) GetTSSHistory(ctx context.Context) ([]types.TSS, error) { resp, err := c.client.observer.TssHistory(ctx, &types.QueryTssHistoryRequest{}) if err != nil { diff --git a/zetaclient/zetacore/client_query_test.go b/zetaclient/zetacore/client_query_test.go index 2b6a2a1c9c..6d56ee605b 100644 --- a/zetaclient/zetacore/client_query_test.go +++ b/zetaclient/zetacore/client_query_test.go @@ -670,7 +670,7 @@ func TestZetacore_GetInboundTrackersForChain(t *testing.T) { require.Equal(t, expectedOutput.InboundTracker, resp) } -func TestZetacore_GetCurrentTss(t *testing.T) { +func TestZetacore_GetTss(t *testing.T) { ctx := context.Background() expectedOutput := observertypes.QueryGetTSSResponse{ @@ -688,7 +688,7 @@ func TestZetacore_GetCurrentTss(t *testing.T) { client := setupZetacoreClient(t, withDefaultObserverKeys()) - resp, err := client.GetCurrentTSS(ctx) + resp, err := client.GetTSS(ctx) require.NoError(t, err) require.Equal(t, expectedOutput.TSS, resp) } From f993da5aa5db578556741711bbab7f9ae8b732b7 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 31 Jul 2024 17:43:58 +0200 Subject: [PATCH 14/20] Add bg.OnComplete --- pkg/bg/bg.go | 44 +++++++++++++++++++++++++------------------- pkg/bg/bg_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/pkg/bg/bg.go b/pkg/bg/bg.go index 6ef682ec01..791c00787a 100644 --- a/pkg/bg/bg.go +++ b/pkg/bg/bg.go @@ -10,9 +10,9 @@ import ( ) type config struct { - name string - logger zerolog.Logger - callback context.CancelFunc + name string + logger zerolog.Logger + onComplete func() } type Opt func(*config) @@ -21,8 +21,10 @@ func WithName(name string) Opt { return func(cfg *config) { cfg.name = name } } -func WithCallback(cancel context.CancelFunc) Opt { - return func(cfg *config) { cfg.callback = cancel } +// OnComplete is a callback function that is called +// when the background task is completed without an error +func OnComplete(fn func()) Opt { + return func(cfg *config) { cfg.onComplete = fn } } func WithLogger(logger zerolog.Logger) Opt { @@ -32,9 +34,9 @@ func WithLogger(logger zerolog.Logger) Opt { // Work emits a new task in the background func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { cfg := config{ - name: "", - logger: zerolog.Nop(), - callback: nil, + name: "", + logger: zerolog.Nop(), + onComplete: nil, } for _, opt := range opts { @@ -51,16 +53,25 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { if err := f(ctx); err != nil { logError(err, cfg, false) + return } - // Use cancel function if it is provided. - // This is used for stopping the main thread based on the outcome of the background task. - if cfg.callback != nil { - cfg.logger.Info().Msgf("background task completed for %s", cfg.name) - cfg.callback() + + if cfg.onComplete != nil { + cfg.onComplete() } + + cfg.logger.Info().Str("worker.name", cfg.getName()).Msg("Background task completed") }() } +func (c config) getName() string { + if c.name != "" { + return c.name + } + + return "unknown" +} + func logError(err error, cfg config, isPanic bool) { if err == nil { return @@ -83,10 +94,5 @@ func logError(err error, cfg config, isPanic bool) { evt.Bytes("stack_trace", buf) } - name := cfg.name - if name == "" { - name = "unknown" - } - - evt.Str("worker.name", name).Msg("Background task failed") + evt.Str("worker.name", cfg.getName()).Msg("Background task failed") } diff --git a/pkg/bg/bg_test.go b/pkg/bg/bg_test.go index 6bbcffd003..bba8180f16 100644 --- a/pkg/bg/bg_test.go +++ b/pkg/bg/bg_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sync/atomic" "testing" "time" @@ -53,6 +54,35 @@ func TestWork(t *testing.T) { assert.JSONEq(t, expected, out.String()) }) + t.Run("with name and logger and onComplete", func(t *testing.T) { + // ARRANGE + // Given a logger + out := &bytes.Buffer{} + logger := zerolog.New(out) + check := int64(0) + + // And a call returning an error + call := func(ctx context.Context) error { + time.Sleep(100 * time.Millisecond) + return nil + } + + complete := func() { + atomic.AddInt64(&check, 1) + } + + // ACT + Work(ctx, call, WithName("hello"), WithLogger(logger), OnComplete(complete)) + time.Sleep(200 * time.Millisecond) + + // Check the log output + const expected = `{"level":"info", "message":"Background task completed", "worker.name":"hello"}` + assert.JSONEq(t, expected, out.String()) + + // Check onComplete + assert.Equal(t, int64(1), check) + }) + t.Run("panic recovery", func(t *testing.T) { // ARRANGE // Given a logger From 7c113a5059da6ce9e6e7a87dc4c1fcecaeacbfe8 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 31 Jul 2024 18:16:52 +0200 Subject: [PATCH 15/20] Add maintenance package. Move TSS listener to maintenance --- cmd/zetaclientd/start.go | 30 ++- zetaclient/chains/interfaces/interfaces.go | 2 + zetaclient/context/app.go | 4 - zetaclient/maintenance/tss_listener.go | 165 +++++++++++++++ .../zetacore/client_tss_migration_listners.go | 194 ------------------ 5 files changed, 181 insertions(+), 214 deletions(-) create mode 100644 zetaclient/maintenance/tss_listener.go delete mode 100644 zetaclient/zetacore/client_tss_migration_listners.go diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 3e53d20ca4..c4ad9b5485 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -27,6 +27,7 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/chains/base" "github.com/zeta-chain/zetacore/zetaclient/config" zctx "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/maintenance" "github.com/zeta-chain/zetacore/zetaclient/metrics" "github.com/zeta-chain/zetacore/zetaclient/orchestrator" mc "github.com/zeta-chain/zetacore/zetaclient/tss" @@ -207,14 +208,6 @@ func start(_ *cobra.Command, _ []string) error { // Set P2P ID for telemetry telemetryServer.SetP2PID(server.GetLocalPeerID()) - // Create a notification context for background threads. - // These threads are responsible for sending shutdown signals to the main thread. - notifyCtx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) - - // Start background threads which monitors zeta core for state changes related to TSS migration - cancelBackgroundThreads := zetacoreClient.StartTssMigrationRoutines(notifyCtx, cancel, masterLogger) - defer cancelBackgroundThreads() - // Generate a new TSS if keygen is set and add it into the tss server // If TSS has already been generated, and keygen was successful ; we use the existing TSS err = GenerateTSS(ctx, masterLogger, zetacoreClient, server) @@ -349,18 +342,23 @@ func start(_ *cobra.Command, _ []string) error { // defer zetaSupplyChecker.Stop() //} - startLogger.Info().Msgf("zetaclientd is running") + startLogger.Info().Msgf("Zetaclientd is running") - <-notifyCtx.Done() + // Creating a channel to listen for os signals (or other signals) + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) - startLogger.Info().Msgf("initiating zetaclientd shut down") + // Maintenance workers ============ + maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() { + masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.") + signalChannel <- syscall.SIGTERM + }) + + sig := <-signalChannel + startLogger.Info().Msgf("Stop signal received: %q", sig) - //stop chain observers - for _, observer := range observerMap { - observer.Stop() - } zetacoreClient.Stop() - startLogger.Info().Msgf("zetaclientd stopped") + return nil } diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index 82125ca619..34e08adca6 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -107,6 +107,8 @@ type ZetacoreClient interface { GetKeys() keyinterfaces.ObserverKeys GetKeyGen(ctx context.Context) (*observertypes.Keygen, error) + GetTSS(ctx context.Context) (observertypes.TSS, error) + GetTSSHistory(ctx context.Context) ([]observertypes.TSS, error) GetBlockHeight(ctx context.Context) (int64, error) GetBlockHeaderChainState(ctx context.Context, chainID int64) (*lightclienttypes.ChainState, error) diff --git a/zetaclient/context/app.go b/zetaclient/context/app.go index dc6793f4d8..187500e781 100644 --- a/zetaclient/context/app.go +++ b/zetaclient/context/app.go @@ -62,10 +62,6 @@ func (a *AppContext) Config() config.Config { return a.config } -func (a *AppContext) Logger() zerolog.Logger { - return a.logger -} - // GetBTCChainAndConfig returns btc chain and config if enabled func (a *AppContext) GetBTCChainAndConfig() (chains.Chain, config.BTCConfig, bool) { cfg, configEnabled := a.Config().GetBTCConfig() diff --git a/zetaclient/maintenance/tss_listener.go b/zetaclient/maintenance/tss_listener.go new file mode 100644 index 0000000000..bb7b1fb050 --- /dev/null +++ b/zetaclient/maintenance/tss_listener.go @@ -0,0 +1,165 @@ +// Package maintenance provides maintenance functionalities for the zetaclient. +package maintenance + +import ( + "context" + "time" + + "cosmossdk.io/errors" + "github.com/rs/zerolog" + + "github.com/zeta-chain/zetacore/pkg/bg" + "github.com/zeta-chain/zetacore/pkg/retry" + observertypes "github.com/zeta-chain/zetacore/x/observer/types" + "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" +) + +const tssListenerTicker = 5 * time.Second + +// TSSListener is a struct that listens for TSS updates, new keygen, and new TSS key generation. +type TSSListener struct { + client interfaces.ZetacoreClient + logger zerolog.Logger +} + +// NewTSSListener creates a new TSSListener. +func NewTSSListener(client interfaces.ZetacoreClient, logger zerolog.Logger) *TSSListener { + log := logger.With().Str("module", "tss_listener").Logger() + + return &TSSListener{ + client: client, + logger: log, + } +} + +// Listen listens for any maintenance regarding TSS and calls action specified. Works in the background. +func (tl *TSSListener) Listen(ctx context.Context, action func()) { + var ( + withLogger = bg.WithLogger(tl.logger) + onComplete = bg.OnComplete(action) + ) + + bg.Work(ctx, tl.waitForUpdate, bg.WithName("tss.wait_for_update"), withLogger, onComplete) + bg.Work(ctx, tl.waitForNewKeyGeneration, bg.WithName("tss.wait_for_generation"), withLogger, onComplete) + bg.Work(ctx, tl.waitForNewKeygen, bg.WithName("tss.wait_for_keygen"), withLogger, onComplete) +} + +// waitForUpdate listens for TSS updates. Returns `nil` when the TSS address is updated +func (tl *TSSListener) waitForUpdate(ctx context.Context) error { + // Initial TSS retrieval + tss, err := retry.DoTypedWithBackoffAndRetry( + func() (observertypes.TSS, error) { return tl.client.GetTSS(ctx) }, + retry.DefaultConstantBackoff(), + ) + + if err != nil { + return errors.Wrap(err, "unable to get initial tss") + } + + ticker := time.NewTicker(tssListenerTicker) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + tssNew, err := tl.client.GetTSS(ctx) + if err != nil { + tl.logger.Warn().Err(err).Msg("unable to get new tss") + continue + } + + if tssNew.TssPubkey == tss.TssPubkey { + continue + } + + tl.logger.Info(). + Str("tss.old", tss.TssPubkey). + Str("tss.new", tssNew.TssPubkey). + Msgf("TSS address is updated") + + return nil + case <-ctx.Done(): + tl.logger.Info().Msg("waitForTSSUpdate stopped") + return nil + } + } +} + +// waitForNewKeyGeneration waits for new TSS key generation; it returns when a new key is generated +// It uses the length of the TSS list to determine if a new key is generated +func (tl *TSSListener) waitForNewKeyGeneration(ctx context.Context) error { + // Initial TSS history retrieval + tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry( + func() ([]observertypes.TSS, error) { return tl.client.GetTSSHistory(ctx) }, + retry.DefaultConstantBackoff(), + ) + if err != nil { + return errors.Wrap(err, "failed to get initial tss history") + } + + tssLen := len(tssHistoricalList) + + ticker := time.NewTicker(tssListenerTicker) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + tssHistoricalListNew, err := tl.client.GetTSSHistory(ctx) + if err != nil { + continue + } + + tssLenUpdated := len(tssHistoricalListNew) + + if tssLenUpdated <= tssLen { + continue + } + + tl.logger.Info().Msgf("tss list updated from %d to %d", tssLen, tssLenUpdated) + return nil + case <-ctx.Done(): + tl.logger.Info().Msg("waitForNewKeyGeneration stopped") + return nil + } + } +} + +// waitForNewKeygen is a background thread that listens for new keygen; it returns when a new keygen is set +func (tl *TSSListener) waitForNewKeygen(ctx context.Context) error { + // Initial Keygen retrieval + keygen, err := retry.DoTypedWithBackoffAndRetry( + func() (*observertypes.Keygen, error) { return tl.client.GetKeyGen(ctx) }, + retry.DefaultConstantBackoff(), + ) + if err != nil { + return errors.Wrap(err, "failed to get initial tss history") + } + + ticker := time.NewTicker(tssListenerTicker) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + keygenUpdated, err := tl.client.GetKeyGen(ctx) + switch { + case err != nil: + tl.logger.Warn().Err(err).Msg("unable to get keygen") + continue + case keygenUpdated == nil: + continue + case keygenUpdated.Status == observertypes.KeygenStatus_PendingKeygen: + continue + case keygen.BlockNumber == keygenUpdated.BlockNumber: + continue + } + + tl.logger.Info().Msgf("got new keygen at block %d", keygen.BlockNumber) + return nil + case <-ctx.Done(): + tl.logger.Info().Msg("waitForNewKeygen stopped") + return nil + } + } +} diff --git a/zetaclient/zetacore/client_tss_migration_listners.go b/zetaclient/zetacore/client_tss_migration_listners.go deleted file mode 100644 index ffb59ad87b..0000000000 --- a/zetaclient/zetacore/client_tss_migration_listners.go +++ /dev/null @@ -1,194 +0,0 @@ -package zetacore - -import ( - "context" - "time" - - "cosmossdk.io/errors" - "github.com/rs/zerolog" - - "github.com/zeta-chain/zetacore/pkg/bg" - "github.com/zeta-chain/zetacore/pkg/retry" - observertypes "github.com/zeta-chain/zetacore/x/observer/types" - zctx "github.com/zeta-chain/zetacore/zetaclient/context" -) - -// startBackgroundRoutines: This function will start background threads. -// These threads are responsible for handling TSS updates, new keygen, and new TSS key generation. -// These threads are provided with a cancel function which is used to restart the main thread based on the outcome of the background task. -func (c *Client) StartTssMigrationRoutines( - ctx context.Context, - cancelFunc context.CancelFunc, - masterLogger zerolog.Logger, -) context.CancelFunc { - migrationRoutinesContext, cancel := context.WithCancel(ctx) - bg.Work( - migrationRoutinesContext, - c.HandleTSSUpdate, - bg.WithName("HandleTSSUpdate"), - bg.WithLogger(masterLogger), - bg.WithCallback(cancelFunc), - ) - bg.Work( - migrationRoutinesContext, - c.HandleNewKeygen, - bg.WithName("HandleNewKeygen"), - bg.WithLogger(masterLogger), - bg.WithCallback(cancelFunc), - ) - bg.Work( - migrationRoutinesContext, - c.HandleNewTSSKeyGeneration, - bg.WithName("HandleNewTSSKeyGeneration"), - bg.WithLogger(masterLogger), - bg.WithCallback(cancelFunc), - ) - return cancel -} - -// HandleTSSUpdate is a background thread that listens for TSS updates; it returns when the TSS address is updated -func (c *Client) HandleTSSUpdate(ctx context.Context) error { - appCtx, err := zctx.FromContext(ctx) - if err != nil { - return errors.Wrap(err, "failed to get app context") - } - - logger := appCtx.Logger().With().Str("module", "HandleTSSUpdate").Logger() - - // Initial TSS retrieval - tss, err := retry.DoTypedWithBackoffAndRetry[observertypes.TSS](func() (observertypes.TSS, error) { - return c.GetTSS(ctx) - }, retry.DefaultConstantBackoff()) - if err != nil { - logger.Warn().Err(err).Msg("unable to get initial tss") - return err - } - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - { - tssNew, err := c.GetTSS(ctx) - if err != nil { - logger.Warn().Err(err).Msg("unable to get new tss") - continue - } - - if tssNew.TssPubkey == tss.TssPubkey { - continue - } - logger.Info().Msgf("tss address is updated from %s to %s", tss.TssPubkey, tssNew.TssPubkey) - return nil - } - case <-ctx.Done(): - { - logger.Info().Msg("HandleTSSUpdate stopped") - return nil - } - } - } -} - -// HandleNewTSSKeyGeneration is a background thread that listens for new TSS key generation; it returns when a new key is generated -// It uses the length of the TSS list to determine if a new key is generated -func (c *Client) HandleNewTSSKeyGeneration(ctx context.Context) error { - appCtx, err := zctx.FromContext(ctx) - if err != nil { - return errors.Wrap(err, "failed to get app context") - } - - logger := appCtx.Logger().With().Str("module", "HandleNewTSSKeyGeneration").Logger() - - // Initial TSS history retrieval - tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry[[]observertypes.TSS]( - func() ([]observertypes.TSS, error) { - return c.GetTSSHistory(ctx) - }, - retry.DefaultConstantBackoff(), - ) - if err != nil { - return errors.Wrap(err, "failed to get initial tss history") - } - tssLen := len(tssHistoricalList) - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - { - tssHistoricalListNew, err := c.GetTSSHistory(ctx) - if err != nil { - continue - } - tssLenUpdated := len(tssHistoricalListNew) - - if tssLenUpdated <= tssLen { - continue - } - logger.Info().Msgf("tss list updated from %d to %d", tssLen, tssLenUpdated) - return nil - } - case <-ctx.Done(): - { - logger.Info().Msg("HandleNewTSSKeyGeneration stopped") - return nil - } - } - } -} - -// HandleNewKeygen is a background thread that listens for new keygen; it returns when a new keygen is set -func (c *Client) HandleNewKeygen(ctx context.Context) error { - appCtx, err := zctx.FromContext(ctx) - if err != nil { - return err - } - logger := appCtx.Logger().With().Str("module", "HandleNewKeygen").Logger() - - // Initial Keygen retrieval - keygen, err := retry.DoTypedWithBackoffAndRetry[*observertypes.Keygen](func() (*observertypes.Keygen, error) { - return c.GetKeyGen(ctx) - }, retry.DefaultConstantBackoff()) - if err != nil { - return errors.Wrap(err, "failed to get initial tss history") - } - - ticker := time.NewTicker(5 * time.Second) - - for { - select { - case <-ticker.C: - { - keygenUpdated, err := c.GetKeyGen(ctx) - if err != nil { - logger.Warn().Err(err).Msg("unable to get keygen") - continue - } - if keygenUpdated == nil { - logger.Warn().Err(err).Msg("keygen is nil") - continue - } - if keygenUpdated.Status != observertypes.KeygenStatus_PendingKeygen { - continue - } - - if keygen.BlockNumber == keygenUpdated.BlockNumber { - continue - } - - logger.Info().Msgf("got new keygen at block %d", keygen.BlockNumber) - return nil - } - case <-ctx.Done(): - { - logger.Info().Msg("HandleNewKeygen stopped") - return nil - } - } - } -} From a665e679db2bbbebc121967c8221dc61ed6294eb Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 2 Aug 2024 16:31:36 +0200 Subject: [PATCH 16/20] Fix merge conflicts --- zetaclient/maintenance/tss_listener.go | 4 +- zetaclient/testutils/mocks/zetacore_client.go | 58 +++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/zetaclient/maintenance/tss_listener.go b/zetaclient/maintenance/tss_listener.go index bb7b1fb050..24c0f12b8b 100644 --- a/zetaclient/maintenance/tss_listener.go +++ b/zetaclient/maintenance/tss_listener.go @@ -129,7 +129,7 @@ func (tl *TSSListener) waitForNewKeyGeneration(ctx context.Context) error { func (tl *TSSListener) waitForNewKeygen(ctx context.Context) error { // Initial Keygen retrieval keygen, err := retry.DoTypedWithBackoffAndRetry( - func() (*observertypes.Keygen, error) { return tl.client.GetKeyGen(ctx) }, + func() (observertypes.Keygen, error) { return tl.client.GetKeyGen(ctx) }, retry.DefaultConstantBackoff(), ) if err != nil { @@ -147,8 +147,6 @@ func (tl *TSSListener) waitForNewKeygen(ctx context.Context) error { case err != nil: tl.logger.Warn().Err(err).Msg("unable to get keygen") continue - case keygenUpdated == nil: - continue case keygenUpdated.Status == observertypes.KeygenStatus_PendingKeygen: continue case keygen.BlockNumber == keygenUpdated.BlockNumber: diff --git a/zetaclient/testutils/mocks/zetacore_client.go b/zetaclient/testutils/mocks/zetacore_client.go index 168b580ada..5efe76aeba 100644 --- a/zetaclient/testutils/mocks/zetacore_client.go +++ b/zetaclient/testutils/mocks/zetacore_client.go @@ -496,6 +496,64 @@ func (_m *ZetacoreClient) GetRateLimiterInput(ctx context.Context, window int64) return r0, r1 } +// GetTSS provides a mock function with given fields: ctx +func (_m *ZetacoreClient) GetTSS(ctx context.Context) (observertypes.TSS, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetTSS") + } + + var r0 observertypes.TSS + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (observertypes.TSS, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) observertypes.TSS); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(observertypes.TSS) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTSSHistory provides a mock function with given fields: ctx +func (_m *ZetacoreClient) GetTSSHistory(ctx context.Context) ([]observertypes.TSS, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetTSSHistory") + } + + var r0 []observertypes.TSS + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]observertypes.TSS, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []observertypes.TSS); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]observertypes.TSS) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetZetaHotKeyBalance provides a mock function with given fields: ctx func (_m *ZetacoreClient) GetZetaHotKeyBalance(ctx context.Context) (math.Int, error) { ret := _m.Called(ctx) From b93f0671b12f911d1c66c4d715bf744696f15943 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Fri, 2 Aug 2024 14:40:53 -0400 Subject: [PATCH 17/20] fix tss migration test --- cmd/zetaclientd/start.go | 20 ++++++++++---------- zetaclient/maintenance/tss_listener.go | 4 +++- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index c4ad9b5485..18088002ea 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -208,6 +208,16 @@ func start(_ *cobra.Command, _ []string) error { // Set P2P ID for telemetry telemetryServer.SetP2PID(server.GetLocalPeerID()) + // Creating a channel to listen for os signals (or other signals) + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) + + // Maintenance workers ============ + maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() { + masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.") + signalChannel <- syscall.SIGTERM + }) + // Generate a new TSS if keygen is set and add it into the tss server // If TSS has already been generated, and keygen was successful ; we use the existing TSS err = GenerateTSS(ctx, masterLogger, zetacoreClient, server) @@ -344,16 +354,6 @@ func start(_ *cobra.Command, _ []string) error { startLogger.Info().Msgf("Zetaclientd is running") - // Creating a channel to listen for os signals (or other signals) - signalChannel := make(chan os.Signal, 1) - signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) - - // Maintenance workers ============ - maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() { - masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.") - signalChannel <- syscall.SIGTERM - }) - sig := <-signalChannel startLogger.Info().Msgf("Stop signal received: %q", sig) diff --git a/zetaclient/maintenance/tss_listener.go b/zetaclient/maintenance/tss_listener.go index bb7b1fb050..86ffc8c87d 100644 --- a/zetaclient/maintenance/tss_listener.go +++ b/zetaclient/maintenance/tss_listener.go @@ -149,7 +149,9 @@ func (tl *TSSListener) waitForNewKeygen(ctx context.Context) error { continue case keygenUpdated == nil: continue - case keygenUpdated.Status == observertypes.KeygenStatus_PendingKeygen: + case keygenUpdated.Status == observertypes.KeygenStatus_KeyGenSuccess || keygenUpdated.Status == observertypes.KeygenStatus_KeyGenFailed: + continue + case keygenUpdated.Status == observertypes.KeygenStatus_PendingKeygen && keygenUpdated.BlockNumber <= keygen.BlockNumber: continue case keygen.BlockNumber == keygenUpdated.BlockNumber: continue From 42a57a7bab485c0c58feeb61097af76114c73004 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Fri, 2 Aug 2024 14:53:08 -0400 Subject: [PATCH 18/20] add structured logging --- zetaclient/maintenance/tss_listener.go | 29 ++++++++++++++++++-------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/zetaclient/maintenance/tss_listener.go b/zetaclient/maintenance/tss_listener.go index 86ffc8c87d..d97d56936a 100644 --- a/zetaclient/maintenance/tss_listener.go +++ b/zetaclient/maintenance/tss_listener.go @@ -67,7 +67,7 @@ func (tl *TSSListener) waitForUpdate(ctx context.Context) error { tl.logger.Warn().Err(err).Msg("unable to get new tss") continue } - + // If the TSS address is not updated, continue loop if tssNew.TssPubkey == tss.TssPubkey { continue } @@ -111,12 +111,15 @@ func (tl *TSSListener) waitForNewKeyGeneration(ctx context.Context) error { } tssLenUpdated := len(tssHistoricalListNew) - + // New tss key has not been added to list , continue loop if tssLenUpdated <= tssLen { continue } - tl.logger.Info().Msgf("tss list updated from %d to %d", tssLen, tssLenUpdated) + tl.logger.Info(). + Int("tssLen", tssLen). + Int("tssLenUpdated", tssLenUpdated). + Msg("tss list updated") return nil case <-ctx.Done(): tl.logger.Info().Msg("waitForNewKeyGeneration stopped") @@ -129,7 +132,7 @@ func (tl *TSSListener) waitForNewKeyGeneration(ctx context.Context) error { func (tl *TSSListener) waitForNewKeygen(ctx context.Context) error { // Initial Keygen retrieval keygen, err := retry.DoTypedWithBackoffAndRetry( - func() (*observertypes.Keygen, error) { return tl.client.GetKeyGen(ctx) }, + func() (observertypes.Keygen, error) { return tl.client.GetKeyGen(ctx) }, retry.DefaultConstantBackoff(), ) if err != nil { @@ -147,17 +150,25 @@ func (tl *TSSListener) waitForNewKeygen(ctx context.Context) error { case err != nil: tl.logger.Warn().Err(err).Msg("unable to get keygen") continue - case keygenUpdated == nil: + // Keygen is not pending it has already been successfully generated, continue loop + case keygenUpdated.Status == observertypes.KeygenStatus_KeyGenSuccess: continue - case keygenUpdated.Status == observertypes.KeygenStatus_KeyGenSuccess || keygenUpdated.Status == observertypes.KeygenStatus_KeyGenFailed: + // Keygen failed we to need to wait until a new keygen is set, continue loop + case keygenUpdated.Status == observertypes.KeygenStatus_KeyGenFailed: continue + // Keygen is pending but block number is not updated, continue loop. + // Most likely the zetaclient is waiting for the keygen block to arrive. case keygenUpdated.Status == observertypes.KeygenStatus_PendingKeygen && keygenUpdated.BlockNumber <= keygen.BlockNumber: continue - case keygen.BlockNumber == keygenUpdated.BlockNumber: - continue } - tl.logger.Info().Msgf("got new keygen at block %d", keygen.BlockNumber) + // Trigger restart only when the following conditions are met: + // 1. Keygen is pending + // 2. Block number is updated + + tl.logger.Info(). + Int64("blockNumber", keygenUpdated.BlockNumber). + Msg("got new keygen") return nil case <-ctx.Done(): tl.logger.Info().Msg("waitForNewKeygen stopped") From 1c132de151314a0ef8a7380c7140681d4985486c Mon Sep 17 00:00:00 2001 From: Tanmay Date: Tue, 6 Aug 2024 10:28:33 -0400 Subject: [PATCH 19/20] remove check fro nonce 0 --- zetaclient/chains/bitcoin/observer/outbound.go | 3 ++- zetaclient/maintenance/tss_listener.go | 5 +---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/zetaclient/chains/bitcoin/observer/outbound.go b/zetaclient/chains/bitcoin/observer/outbound.go index 009a49759e..27e7537874 100644 --- a/zetaclient/chains/bitcoin/observer/outbound.go +++ b/zetaclient/chains/bitcoin/observer/outbound.go @@ -151,7 +151,8 @@ func (ob *Observer) VoteOutboundIfConfirmed( // prevents double spending of same UTXO. However, for nonce 0, we don't have a prior nonce (e.g., -1) // for the signer to check against when making the payment. Signer treats nonce 0 as a special case in downstream code. if nonce == 0 { - return false, nil + ob.logger.Outbound.Info().Msgf("VoteOutboundIfConfirmed: outbound %s is nonce 0", outboundID) + return true, nil } // Try including this outbound broadcasted by myself diff --git a/zetaclient/maintenance/tss_listener.go b/zetaclient/maintenance/tss_listener.go index d97d56936a..e42ead57c4 100644 --- a/zetaclient/maintenance/tss_listener.go +++ b/zetaclient/maintenance/tss_listener.go @@ -131,10 +131,7 @@ func (tl *TSSListener) waitForNewKeyGeneration(ctx context.Context) error { // waitForNewKeygen is a background thread that listens for new keygen; it returns when a new keygen is set func (tl *TSSListener) waitForNewKeygen(ctx context.Context) error { // Initial Keygen retrieval - keygen, err := retry.DoTypedWithBackoffAndRetry( - func() (observertypes.Keygen, error) { return tl.client.GetKeyGen(ctx) }, - retry.DefaultConstantBackoff(), - ) + keygen, err := tl.client.GetKeyGen(ctx) if err != nil { return errors.Wrap(err, "failed to get initial tss history") } From b5c0d7711aa299e7129c82af9303debc1f4b5f57 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Tue, 6 Aug 2024 11:24:48 -0500 Subject: [PATCH 20/20] fix OutboundID generation to make the identifier more unique --- zetaclient/chains/base/observer.go | 8 +++- zetaclient/chains/base/observer_test.go | 42 +++++++++++++++++++ .../chains/bitcoin/observer/outbound.go | 2 +- 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index 428946f0bf..6021b9219e 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -268,8 +268,14 @@ func (ob *Observer) WithHeaderCache(cache *lru.Cache) *Observer { } // OutboundID returns a unique identifier for the outbound transaction. +// The identifier is now used as the key for maps that store outbound related data (e.g. transaction, receipt, etc). func (ob *Observer) OutboundID(nonce uint64) string { - return fmt.Sprintf("%d-%d", ob.chain.ChainId, nonce) + // all chains uses EVM address as part of the key except bitcoin + tssAddress := ob.tss.EVMAddress().String() + if ob.chain.Consensus == chains.Consensus_bitcoin { + tssAddress = ob.tss.BTCAddress() + } + return fmt.Sprintf("%d-%s-%d", ob.chain.ChainId, tssAddress, nonce) } // DB returns the database for the observer. diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index b40802c0a7..50148a551b 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -2,6 +2,7 @@ package base_test import ( "context" + "fmt" "os" "testing" @@ -248,6 +249,47 @@ func TestObserverGetterAndSetter(t *testing.T) { }) } +func TestOutboundID(t *testing.T) { + tests := []struct { + name string + chain chains.Chain + tss interfaces.TSSSigner + nonce uint64 + }{ + { + name: "should get correct outbound id for Ethereum chain", + chain: chains.Ethereum, + tss: mocks.NewTSSMainnet(), + nonce: 100, + }, + { + name: "should get correct outbound id for Bitcoin chain", + chain: chains.BitcoinMainnet, + tss: mocks.NewTSSMainnet(), + nonce: 200, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // create observer + ob := createObserver(t, tt.chain) + ob = ob.WithTSS(tt.tss) + + // get outbound id + outboundID := ob.OutboundID(tt.nonce) + + // expected outbound id + exepctedID := fmt.Sprintf("%d-%s-%d", tt.chain.ChainId, tt.tss.EVMAddress(), tt.nonce) + if tt.chain.Consensus == chains.Consensus_bitcoin { + exepctedID = fmt.Sprintf("%d-%s-%d", tt.chain.ChainId, tt.tss.BTCAddress(), tt.nonce) + } + require.Equal(t, exepctedID, outboundID) + }) + } +} + func TestLoadLastBlockScanned(t *testing.T) { chain := chains.Ethereum envvar := base.EnvVarLatestBlockByChain(chain) diff --git a/zetaclient/chains/bitcoin/observer/outbound.go b/zetaclient/chains/bitcoin/observer/outbound.go index 27e7537874..55b56f7a83 100644 --- a/zetaclient/chains/bitcoin/observer/outbound.go +++ b/zetaclient/chains/bitcoin/observer/outbound.go @@ -152,7 +152,7 @@ func (ob *Observer) VoteOutboundIfConfirmed( // for the signer to check against when making the payment. Signer treats nonce 0 as a special case in downstream code. if nonce == 0 { ob.logger.Outbound.Info().Msgf("VoteOutboundIfConfirmed: outbound %s is nonce 0", outboundID) - return true, nil + return false, nil } // Try including this outbound broadcasted by myself