Skip to content

Commit

Permalink
Apply graceful package to start.go
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Dec 12, 2024
1 parent 6df29a3 commit 6f43ea0
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 89 deletions.
120 changes: 50 additions & 70 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@ import (
"net/http"
_ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional
"os"
"os/signal"
"syscall"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/graceful"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/maintenance"
"github.com/zeta-chain/node/zetaclient/metrics"
"github.com/zeta-chain/node/zetaclient/orchestrator"
Expand Down Expand Up @@ -59,28 +56,10 @@ func Start(_ *cobra.Command, _ []string) error {
appContext := zctx.New(cfg, passes.relayerKeys(), logger.Std)
ctx := zctx.WithAppContext(context.Background(), appContext)

// TODO graceful
telemetryServer := metrics.NewTelemetryServer()
go func() {
err := telemetryServer.Start()
if err != nil {
log.Fatal().Err(err).Msg("telemetryServer error")
}
}()

m, err := metrics.NewMetrics()
telemetry, err := startTelemetry(ctx, cfg)
if err != nil {
return errors.Wrap(err, "unable to create metrics")
return errors.Wrap(err, "unable to start telemetry")
}
m.Start()

metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

telemetryServer.SetIPAddress(cfg.PublicIP)

// TODO graceful
go runPprof(logger.Std)

// zetacore client is used for all communication to zeta node.
// it accumulates votes, and provides a source of truth for all clients
Expand Down Expand Up @@ -118,24 +97,19 @@ func Start(_ *cobra.Command, _ []string) error {
TSSKeyPassword: passes.tss,
BitcoinChainIDs: btcChainIDsFromContext(appContext),
PostBlame: isEnvFlagEnabled(envFlagPostBlame),
Telemetry: telemetryServer,
Telemetry: telemetry,
}

tss, err := zetatss.Setup(ctx, tssSetupProps, logger.Std)
if err != nil {
return errors.Wrap(err, "unable to setup TSS service")
}

// Creating a channel to listen for os signals (or other signals)
// TODO graceful
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() {
logger.Std.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
graceful.ShutdownNow()
})

// CreateSignerMap: This creates a map of all signers for each chain.
Expand All @@ -148,7 +122,7 @@ func Start(_ *cobra.Command, _ []string) error {

// Creates a map of all chain observers for each chain.
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetryServer)
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetry)
if err != nil {
return errors.Wrap(err, "unable to create chain observer map")
}
Expand All @@ -164,55 +138,21 @@ func Start(_ *cobra.Command, _ []string) error {
tss,
dbPath,
logger,
telemetryServer,
telemetry,
)
if err != nil {
return errors.Wrap(err, "unable to create orchestrator")
}

// Start orchestrator with all observers and signers
if err = maestro.Start(ctx); err != nil {
return errors.Wrap(err, "unable to start orchestrator")
}

log.Info().Msg("zetaclientd is running")
graceful.AddService(ctx, maestro)

// todo graceful
sig := <-signalChannel
log.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig)

maestro.Stop()
// Block current routine until a shutdown signal is received
graceful.WaitForShutdown()

return nil
}

func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) {
// Get observer's public key ("grantee pub key")
_, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword)
if err != nil {
return "", errors.Wrap(err, "unable to get keyring key base")
}

return granteePubKeyBech32, nil
}

// runPprof run pprof http server
// zetacored/cometbft is already listening for runPprof on 6060 (by default)
func runPprof(logger zerolog.Logger) {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}

logger.Info().Str("addr", addr).Msg("starting pprof http server")

// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
logger.Error().Err(err).Msg("pprof http server error")
}
}

type passwords struct {
hotkey string
tss string
Expand Down Expand Up @@ -240,3 +180,43 @@ func (p passwords) relayerKeys() map[string]string {
chains.Network_solana.String(): p.solanaRelayerKey,
}
}

func startTelemetry(ctx context.Context, cfg config.Config) (*metrics.TelemetryServer, error) {
// 1. Init pprof http server
pprofServer := func(_ context.Context) error {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}

log.Info().Str("addr", addr).Msg("starting pprof http server")

// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
log.Error().Err(err).Msg("pprof http server error")
}

return nil
}

// 2. Init metrics server
metricsServer, err := metrics.NewMetrics()
if err != nil {
return nil, errors.Wrap(err, "unable to create metrics")
}

metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

// 3. Init telemetry server
telemetry := metrics.NewTelemetryServer()
telemetry.SetIPAddress(cfg.PublicIP)

// 4. Add services to the process
graceful.AddStarter(ctx, pprofServer)
graceful.AddService(ctx, metricsServer)
graceful.AddService(ctx, telemetry)

return telemetry, nil
}
10 changes: 10 additions & 0 deletions cmd/zetaclientd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,13 @@ func btcChainIDsFromContext(app *zctx.AppContext) []int64 {

return btcChainIDs
}

func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) {
// Get observer's public key ("grantee pub key")
_, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword)
if err != nil {
return "", errors.Wrap(err, "unable to get keyring key base")
}

return granteePubKeyBech32, nil
}
28 changes: 27 additions & 1 deletion pkg/graceful/graceful.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

// Process represents "virtual" process that contains
Expand All @@ -34,6 +36,9 @@ type Service interface {
Stop()
}

// DefaultProcess is a process instance with some sane defaults.
var DefaultProcess = New(15*time.Second, log.Logger, NewSigChan(syscall.SIGINT, syscall.SIGTERM))

// New Process constructor.
func New(timeout time.Duration, logger zerolog.Logger, stop <-chan os.Signal) *Process {
return &Process{
Expand Down Expand Up @@ -82,7 +87,8 @@ func (p *Process) WaitForShutdown() {

for {
select {
case <-p.stop:
case sig := <-p.stop:
p.logger.Info().Msgf("Received signal: %q", sig.String())
p.ShutdownNow()
return
case <-t.C:
Expand Down Expand Up @@ -145,3 +151,23 @@ func NewSigChan(signals ...os.Signal) chan os.Signal {

return out
}

func AddService(ctx context.Context, s Service) {
DefaultProcess.AddService(ctx, s)
}

func AddStarter(ctx context.Context, fn func(ctx context.Context) error) {
DefaultProcess.AddStarter(ctx, fn)
}

func AddStopper(fn func()) {
DefaultProcess.AddStopper(fn)
}

func WaitForShutdown() {
DefaultProcess.WaitForShutdown()
}

func ShutdownNow() {
DefaultProcess.ShutdownNow()
}
2 changes: 1 addition & 1 deletion zetaclient/chains/base/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestInitLogger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// init logger
logger, err := base.InitLogger(tt.cfg)
logger, err := base.NewLogger(tt.cfg)

// check if error is expected
if tt.fail {
Expand Down
23 changes: 14 additions & 9 deletions zetaclient/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/url"
"time"

"cosmossdk.io/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -186,20 +187,24 @@ func NewMetrics() (*Metrics, error) {
}

// Start starts the metrics server
func (m *Metrics) Start() {
func (m *Metrics) Start(_ context.Context) error {
log.Info().Msg("metrics server starting")
go func() {
if err := m.s.ListenAndServe(); err != nil {
log.Error().Err(err).Msg("fail to start metric server")
}
}()

if err := m.s.ListenAndServe(); err != nil {
return errors.Wrap(err, "fail to start metric server")
}

return nil
}

// Stop stops the metrics server
func (m *Metrics) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
func (m *Metrics) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return m.s.Shutdown(ctx)

if err := m.s.Shutdown(ctx); err != nil {
log.Error().Err(err).Msg("failed to shutdown metrics server")
}
}

// GetInstrumentedHTTPClient sets up a http client that emits prometheus metrics
Expand Down
3 changes: 2 additions & 1 deletion zetaclient/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"fmt"
"io"
"net/http"
Expand All @@ -25,7 +26,7 @@ var _ = Suite(&MetricsSuite{})
func (ms *MetricsSuite) SetUpSuite(c *C) {
m, err := NewMetrics()
c.Assert(err, IsNil)
m.Start()
go m.Start(context.Background())
ms.m = m
}

Expand Down
14 changes: 7 additions & 7 deletions zetaclient/metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,11 @@ func (t *TelemetryServer) Handlers() http.Handler {
}

// Start starts telemetry server
func (t *TelemetryServer) Start() error {
func (t *TelemetryServer) Start(_ context.Context) error {
if t.s == nil {
return errors.New("invalid http server instance")
}

if err := t.s.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("fail to start http server: %w", err)
Expand All @@ -212,14 +213,13 @@ func (t *TelemetryServer) Start() error {
}

// Stop stops telemetry server
func (t *TelemetryServer) Stop() error {
c, cancel := context.WithTimeout(context.Background(), 10*time.Second)
func (t *TelemetryServer) Stop() {
c, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := t.s.Shutdown(c)
if err != nil {
log.Error().Err(err).Msg("Failed to shutdown the HTTP server gracefully")

if err := t.s.Shutdown(c); err != nil {
log.Error().Err(err).Msg("Failed to shutdown the TelemetryServer")
}
return err
}

// pingHandler returns a 200 OK response
Expand Down

0 comments on commit 6f43ea0

Please sign in to comment.