diff --git a/go.mod b/go.mod index 68f44d9dad..0e356d652d 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/tendermint/tm-db v0.6.7 golang.org/x/crypto v0.24.0 golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 + golang.org/x/sync v0.7.0 google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.34.1 diff --git a/server/api/server.go b/server/api/server.go index f49c903640..bc2fad65e3 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "net" "net/http" @@ -27,9 +28,9 @@ type Server struct { Router *mux.Router GRPCGatewayRouter *runtime.ServeMux ClientCtx client.Context + logger log.Logger + metrics *telemetry.Metrics - logger log.Logger - metrics *telemetry.Metrics // Start() is blocking and generally called from a separate goroutine. // Close() can be called asynchronously and access shared memory // via the listener. Therefore, we sync access to Start and Close with @@ -85,7 +86,12 @@ func New(clientCtx client.Context, logger log.Logger) *Server { // JSON RPC server. Configuration options are provided via config.APIConfig // and are delegated to the Tendermint JSON RPC server. The process is // non-blocking, so an external signal handler must be used. -func (s *Server) Start(cfg config.Config) error { +// and are delegated to the CometBFT JSON RPC server. +// +// Note, this creates a blocking process if the server is started successfully. +// Otherwise, an error is returned. The caller is expected to provide a Context +// that is properly canceled or closed to indicate the server should be stopped. +func (s *Server) Start(ctx context.Context, cfg config.Config) error { s.mtx.Lock() tmCfg := tmrpcserver.DefaultConfig() @@ -107,13 +113,35 @@ func (s *Server) Start(cfg config.Config) error { // register grpc-gateway routes s.Router.PathPrefix("/").Handler(s.GRPCGatewayRouter) - s.logger.Info("starting API server...") - if cfg.API.EnableUnsafeCORS { - allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) - return tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, tmCfg) - } + errCh := make(chan error) + + // Start the API in an external goroutine as Serve is blocking and will return + // an error upon failure, which we'll send on the error channel that will be + // consumed by the for block below. + go func(enableUnsafeCORS bool) { + s.logger.Info("starting API server...", "address", cfg.API.Address) - return tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg) + if enableUnsafeCORS { + allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) + errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, tmCfg) + } else { + errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg) + } + }(cfg.API.EnableUnsafeCORS) + + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. + select { + case <-ctx.Done(): + // The calling process canceled or closed the provided context, so we must + // gracefully stop the API server. + s.logger.Info("stopping API server...", "address", cfg.API.Address) + return s.Close() + + case err := <-errCh: + s.logger.Error("failed to start API server", "err", err) + return err + } } // Close closes the API server. diff --git a/server/grpc/server.go b/server/grpc/server.go index 0e2877b53a..d51c9effec 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -1,10 +1,12 @@ package grpc import ( + "context" "fmt" + sdk "github.com/Finschia/finschia-sdk/types" "net" - "time" + "github.com/tendermint/tendermint/libs/log" "google.golang.org/grpc" "github.com/Finschia/finschia-sdk/client" @@ -13,11 +15,11 @@ import ( "github.com/Finschia/finschia-sdk/server/grpc/gogoreflection" reflection "github.com/Finschia/finschia-sdk/server/grpc/reflection/v2" "github.com/Finschia/finschia-sdk/server/types" - sdk "github.com/Finschia/finschia-sdk/types" ) -// StartGRPCServer starts a gRPC server on the given address. -func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { +// NewGRPCServer returns a correctly configured and initialized gRPC server. +// Note, the caller is responsible for starting the server. See StartGRPCServer. +func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { maxSendMsgSize := cfg.MaxSendMsgSize if maxSendMsgSize == 0 { maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize @@ -29,10 +31,11 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config } grpcSrv := grpc.NewServer( + grpc.ForceServerCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), grpc.MaxSendMsgSize(maxSendMsgSize), grpc.MaxRecvMsgSize(maxRecvMsgSize), - grpc.ForceServerCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), ) + app.RegisterGRPCServer(grpcSrv) // Reflection allows consumers to build dynamic clients that can write to any @@ -51,30 +54,50 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config InterfaceRegistry: clientCtx.InterfaceRegistry, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to register reflection service: %w", err) } + // Reflection allows external clients to see what services and methods // the gRPC server exposes. gogoreflection.Register(grpcSrv) + return grpcSrv, nil +} + +// StartGRPCServer starts the provided gRPC server on the address specified in cfg. +// +// Note, this creates a blocking process if the server is started successfully. +// Otherwise, an error is returned. The caller is expected to provide a Context +// that is properly canceled or closed to indicate the server should be stopped. +func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error { listener, err := net.Listen("tcp", cfg.Address) if err != nil { - return nil, err + return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err) } errCh := make(chan error) + + // Start the gRPC in an external goroutine as Serve is blocking and will return + // an error upon failure, which we'll send on the error channel that will be + // consumed by the for block below. go func() { - err = grpcSrv.Serve(listener) - if err != nil { - errCh <- fmt.Errorf("failed to serve: %w", err) - } + logger.Info("starting gRPC server...", "address", cfg.Address) + errCh <- grpcSrv.Serve(listener) }() + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. select { + case <-ctx.Done(): + // The calling process canceled or closed the provided context, so we must + // gracefully stop the gRPC server. + logger.Info("stopping gRPC server...", "address", cfg.Address) + grpcSrv.GracefulStop() + + return nil + case err := <-errCh: - return nil, err - case <-time.After(types.ServerStartTime): - // assume server started successfully - return grpcSrv, nil + logger.Error("failed to start gRPC server", "err", err) + return err } } diff --git a/server/start.go b/server/start.go index 6e11863c92..ceca66df62 100644 --- a/server/start.go +++ b/server/start.go @@ -3,23 +3,28 @@ package server // DONTCOVER import ( + "context" "fmt" + "io" "net" "os" "runtime/pprof" "strings" - "time" + "github.com/hashicorp/go-metrics" "github.com/spf13/cobra" "github.com/tendermint/tendermint/abci/server" tmcmd "github.com/tendermint/tendermint/cmd/tendermint/commands" - "github.com/tendermint/tendermint/config" - tmos "github.com/tendermint/tendermint/libs/os" + tmcfg "github.com/tendermint/tendermint/config" + tmlog "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/p2p" pvm "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" + rpchttp "github.com/tendermint/tendermint/rpc/client/http" "github.com/tendermint/tendermint/rpc/client/local" + tmtypes "github.com/tendermint/tendermint/types" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -34,6 +39,7 @@ import ( "github.com/Finschia/finschia-sdk/store/iavl" storetypes "github.com/Finschia/finschia-sdk/store/types" "github.com/Finschia/finschia-sdk/telemetry" + "github.com/Finschia/finschia-sdk/version" ) // Tendermint full-node start flags @@ -118,29 +124,49 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. return err }, RunE: func(cmd *cobra.Command, _ []string) error { - serverCtx := GetServerContextFromCmd(cmd) + svrCtx := GetServerContextFromCmd(cmd) clientCtx, err := client.GetClientQueryContext(cmd) if err != nil { return err } - withTM, _ := cmd.Flags().GetBool(flagWithTendermint) - if !withTM { - serverCtx.Logger.Info("starting ABCI without Tendermint") - return startStandAlone(serverCtx, appCreator) + svrCfg, err := getAndValidateConfig(svrCtx) + if err != nil { + return err } - serverCtx.Logger.Info("starting ABCI with Tendermint") + tmetrics, err := startTelemetry(svrCfg) + if err != nil { + return err + } + + emitServerInfoMetrics() - // amino is needed here for backwards compatibility of REST routes - err = startInProcess(serverCtx, clientCtx, appCreator) - errCode, ok := err.(ErrorCode) - if !ok { + db, err := openDB(svrCtx.Config.RootDir) + if err != nil { return err } - serverCtx.Logger.Debug(fmt.Sprintf("received quit signal: %d", errCode.Code)) - return nil + traceWriter, traceCleanupFn, err := SetupTraceWriter(svrCtx.Logger, svrCtx.Viper.GetString(flagTraceStore)) + if err != nil { + return err + } + defer traceCleanupFn() + + app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper) + + withTM, _ := cmd.Flags().GetBool(flagWithTendermint) + if !withTM { + svrCtx.Logger.Info("starting ABCI without Tendermint") + + return wrapCPUProfile(svrCtx, func() error { + return startStandAlone(svrCtx, svrCfg, clientCtx, app, tmetrics) + }) + } + + return wrapCPUProfile(svrCtx, func() error { + return startInProcess(svrCtx, svrCfg, clientCtx, app, tmetrics) + }) }, } @@ -181,229 +207,218 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. return cmd } -func startStandAlone(ctx *Context, appCreator types.AppCreator) error { - addr := ctx.Viper.GetString(flagAddress) - transport := ctx.Viper.GetString(flagTransport) - home := ctx.Viper.GetString(flags.FlagHome) - - db, err := openDB(home) +func startStandAlone(svrCtx *Context, svrCfg serverconfig.Config, clientCtx client.Context, app types.Application, tmetrics *telemetry.Metrics) error { + svr, err := server.NewServer(svrCtx.Viper.GetString(flagAddress), svrCtx.Viper.GetString(flagTransport), app) if err != nil { - return err + return fmt.Errorf("error creating listener: %v", err) } - traceWriterFile := ctx.Viper.GetString(flagTraceStore) - traceWriter, err := openTraceWriter(traceWriterFile) - if err != nil { - return err - } + svr.SetLogger(svrCtx.Logger.With("module", "abci-server")) - app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper) + g, ctx := getCtx(svrCtx, false) - config, err := serverconfig.GetConfig(ctx.Viper) - if err != nil { - return err - } + // Add the tx service to the gRPC router. We only need to register this + // service if API or gRPC is enabled, and avoid doing so in the general + // case, because it spawns a new local CometBFT RPC client. + if svrCfg.API.Enable || svrCfg.GRPC.Enable { + // create tendermint client + // assumes the rpc listen address is where tendermint has its rpc server + rpcclient, err := rpchttp.New(svrCtx.Config.RPC.ListenAddress, "/websocket") + if err != nil { + return err + } + // re-assign for making the client available below + // do not use := to avoid shadowing clientCtx + clientCtx = clientCtx.WithClient(rpcclient) - _, err = startTelemetry(config) - if err != nil { - return err + // use the provided clientCtx to register the services + app.RegisterTxService(clientCtx) + app.RegisterTendermintService(clientCtx) + if a, ok := app.(types.ApplicationQueryService); ok { + a.RegisterNodeService(clientCtx) + } } - svr, err := server.NewServer(addr, transport, app) + clientCtx, err = startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app) if err != nil { - return fmt.Errorf("error creating listener: %v", err) + return err } - svr.SetLogger(ctx.Logger.With("module", "abci-server")) - - err = svr.Start() + err = startAPIServer(ctx, g, clientCtx, svrCfg, svrCtx, app, svrCtx.Config.RootDir, tmetrics) if err != nil { - tmos.Exit(err.Error()) + return err } - defer func() { - if err = svr.Stop(); err != nil { - tmos.Exit(err.Error()) + g.Go(func() error { + if err := svr.Start(); err != nil { + svrCtx.Logger.Error("failed to start out-of-process ABCI server", "err", err) + return err } - }() - // Wait for SIGINT or SIGTERM signal - return WaitForQuitSignals() + // Wait for the calling process to be canceled or close the provided context, + // so we can gracefully stop the ABCI server. + <-ctx.Done() + svrCtx.Logger.Info("stopping the ABCI server...") + return svr.Stop() + }) + + return g.Wait() } -func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error { - cfg := ctx.Config - home := cfg.RootDir - var cpuProfileCleanup func() +func startInProcess(svrCtx *Context, svrCfg serverconfig.Config, clientCtx client.Context, app types.Application, + tmetrics *telemetry.Metrics, +) error { + tmCfg := svrCtx.Config + gRPCOnly := svrCtx.Viper.GetBool(flagGRPCOnly) - if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" { - f, err := os.Create(cpuProfile) + g, ctx := getCtx(svrCtx, true) + + if gRPCOnly { + // TODO: Generalize logic so that gRPC only is really in startStandAlone + svrCtx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled") + svrCfg.GRPC.Enable = true + } else { + svrCtx.Logger.Info("starting node with ABCI Tendermint in-process") + tmNode, cleanupFn, err := startTmNode(ctx, tmCfg, app, svrCtx) if err != nil { return err } + defer cleanupFn() - ctx.Logger.Info("starting CPU profiler", "profile", cpuProfile) - if err := pprof.StartCPUProfile(f); err != nil { - return err - } + // Add the tx service to the gRPC router. We only need to register this + // service if API or gRPC is enabled, and avoid doing so in the general + // case, because it spawns a new local tendermint RPC client. + if svrCfg.API.Enable || svrCfg.GRPC.Enable { + // Re-assign for making the client available below do not use := to avoid + // shadowing the clientCtx variable. + clientCtx = clientCtx.WithClient(local.New(tmNode)) - cpuProfileCleanup = func() { - ctx.Logger.Info("stopping CPU profiler", "profile", cpuProfile) - pprof.StopCPUProfile() - f.Close() + app.RegisterTxService(clientCtx) + app.RegisterTendermintService(clientCtx) + + if a, ok := app.(types.ApplicationQueryService); ok { + a.RegisterNodeService(clientCtx) + } } } - traceWriterFile := ctx.Viper.GetString(flagTraceStore) - db, err := openDB(home) + clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app) if err != nil { return err } - traceWriter, err := openTraceWriter(traceWriterFile) + err = startAPIServer(ctx, g, clientCtx, svrCfg, svrCtx, app, svrCtx.Config.RootDir, tmetrics) if err != nil { return err } - svrCfg, err := serverconfig.GetConfig(ctx.Viper) - if err != nil { - return err - } + // wait for signal capture and gracefully return + // we are guaranteed to be waiting for the "ListenForQuitSignals" goroutine. + return g.Wait() +} - if err := svrCfg.ValidateBasic(); err != nil { - ctx.Logger.Error("WARNING: The minimum-gas-prices config in app.toml is set to the empty string. " + - "This defaults to 0 in the current version, but will error in the next version " + - "(SDK v0.45). Please explicitly put the desired minimum-gas-prices in your app.toml.") +func genPvFileOnlyWhenKmsAddressEmpty(cfg *tmcfg.Config) *pvm.FilePV { + if len(strings.TrimSpace(cfg.PrivValidatorListenAddr)) == 0 { + return pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()) } + return nil +} - app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper) - - nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile()) +func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) { + svrcfg, err := serverconfig.GetConfig(svrCtx.Viper) if err != nil { - return err + return svrcfg, err } - genDocProvider := node.DefaultGenesisDocProviderFunc(cfg) - - var ( - tmNode *node.Node - gRPCOnly = ctx.Viper.GetBool(flagGRPCOnly) - ) - - if gRPCOnly { - ctx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled") - svrCfg.GRPC.Enable = true - } else { - ctx.Logger.Info("starting node with ABCI Tendermint in-process") - - pv := genPvFileOnlyWhenKmsAddressEmpty(cfg) - - tmNode, err = node.NewNode( - cfg, - pv, - nodeKey, - proxy.NewLocalClientCreator(app), - genDocProvider, - node.DefaultDBProvider, - node.DefaultMetricsProvider(cfg.Instrumentation), - ctx.Logger, - ) - if err != nil { - return err - } - ctx.Logger.Debug("initialization: tmNode created") - if err := tmNode.Start(); err != nil { - return err - } - ctx.Logger.Debug("initialization: tmNode started") + if err := svrcfg.ValidateBasic(); err != nil { + return svrcfg, err } + return svrcfg, nil +} - // Add the tx service to the gRPC router. We only need to register this - // service if API or gRPC is enabled, and avoid doing so in the general - // case, because it spawns a new local tendermint RPC client. - if (svrCfg.API.Enable || svrCfg.GRPC.Enable) && tmNode != nil { - clientCtx = clientCtx.WithClient(local.New(tmNode)) +// returns a function which returns the genesis doc from the genesis file. +func getGenDocProvider(cfg *tmcfg.Config) func() (*tmtypes.GenesisDoc, error) { + return node.DefaultGenesisDocProviderFunc(cfg) +} - app.RegisterTxService(clientCtx) - app.RegisterTendermintService(clientCtx) +// SetupTraceWriter sets up the trace writer and returns a cleanup function. +func SetupTraceWriter(logger tmlog.Logger, traceWriterFile string) (traceWriter io.WriteCloser, cleanup func(), err error) { + // clean up the traceWriter when the server is shutting down + cleanup = func() {} - if a, ok := app.(types.ApplicationQueryService); ok { - a.RegisterNodeService(clientCtx) - } - } - - metrics, err := startTelemetry(svrCfg) + traceWriter, err = openTraceWriter(traceWriterFile) if err != nil { - return err + return traceWriter, cleanup, err } - grpcSrv, clientCtx, err := startGrpcServer(svrCfg.GRPC, clientCtx, ctx, app) - if err != nil { - return err + // if flagTraceStore is not used then traceWriter is nil + if traceWriter != nil { + cleanup = func() { + if err = traceWriter.Close(); err != nil { + logger.Error("failed to close trace writer", "err", err) + } + } } - genDoc, err := genDocProvider() + return traceWriter, cleanup, nil +} + +// TODO: Move nodeKey into being created within the function. +func startTmNode( + _ context.Context, + cfg *tmcfg.Config, + app types.Application, + svrCtx *Context, +) (tmNode *node.Node, cleanupFn func(), err error) { + nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile()) if err != nil { - return err + return nil, cleanupFn, err } - clientCtx.WithChainID(genDoc.ChainID) - apiSrv, err := startAPIServer(svrCfg, clientCtx, ctx, app, cfg.RootDir, metrics) + tmNode, err = node.NewNode( + cfg, + genPvFileOnlyWhenKmsAddressEmpty(cfg), + nodeKey, + proxy.NewLocalClientCreator(app), + getGenDocProvider(cfg), + node.DefaultDBProvider, + node.DefaultMetricsProvider(cfg.Instrumentation), + svrCtx.Logger, + ) if err != nil { - return err + return tmNode, cleanupFn, err } - // At this point it is safe to block the process if we're in gRPC only mode as - // we do not need to handle any Tendermint related processes. - if gRPCOnly { - // wait for signal capture and gracefully return - return WaitForQuitSignals() + svrCtx.Logger.Debug("initialization: tmNode created") + if err := tmNode.Start(); err != nil { + return tmNode, cleanupFn, err } + svrCtx.Logger.Debug("initialization: tmNode started") - defer func() { + cleanupFn = func() { if tmNode != nil && tmNode.IsRunning() { _ = tmNode.Stop() } - - if cpuProfileCleanup != nil { - cpuProfileCleanup() - } - - if grpcSrv != nil { - grpcSrv.Stop() - } - - if apiSrv != nil { - _ = apiSrv.Close() - } - - ctx.Logger.Info("exiting...") - }() - - // wait for signal capture and gracefully return - return WaitForQuitSignals() -} - -func genPvFileOnlyWhenKmsAddressEmpty(cfg *config.Config) *pvm.FilePV { - if len(strings.TrimSpace(cfg.PrivValidatorListenAddr)) == 0 { - return pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()) } - return nil + + return tmNode, cleanupFn, nil } func startGrpcServer( + ctx context.Context, + g *errgroup.Group, config serverconfig.GRPCConfig, clientCtx client.Context, svrCtx *Context, app types.Application, -) (*grpc.Server, client.Context, error) { +) (client.Context, error) { if !config.Enable { // return grpcServer as nil if gRPC is disabled - return nil, clientCtx, nil + return clientCtx, nil } _, _, err := net.SplitHostPort(config.Address) if err != nil { - return nil, clientCtx, err + return clientCtx, err } maxSendMsgSize := config.MaxSendMsgSize @@ -427,30 +442,37 @@ func startGrpcServer( ), ) if err != nil { - return nil, clientCtx, err + return clientCtx, err } clientCtx = clientCtx.WithGRPCClient(grpcClient) svrCtx.Logger.Debug("gRPC client assigned to client context", "target", config.Address) - grpcSrv, err := servergrpc.StartGRPCServer(clientCtx, app, config) + grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, app, config) if err != nil { - return nil, clientCtx, err + return clientCtx, err } - return grpcSrv, clientCtx, nil + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + g.Go(func() error { + return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config, grpcSrv) + }) + return clientCtx, nil } func startAPIServer( - svrCfg serverconfig.Config, + ctx context.Context, + g *errgroup.Group, clientCtx client.Context, + svrCfg serverconfig.Config, svrCtx *Context, app types.Application, home string, metrics *telemetry.Metrics, -) (*api.Server, error) { +) error { if !svrCfg.API.Enable { - return nil, nil + return nil } clientCtx = clientCtx.WithHomeDir(home) @@ -462,26 +484,78 @@ func startAPIServer( apiSrv.SetTelemetry(metrics) } + g.Go(func() error { + return apiSrv.Start(ctx, svrCfg) + }) + return nil +} + +func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { + if !cfg.Telemetry.Enabled { + return nil, nil + } + return telemetry.New(cfg.Telemetry) +} + +// wrapCPUProfile starts CPU profiling, if enabled, and executes the provided +// callbackFn in a separate goroutine, then will wait for that callback to +// return. +// +// NOTE: We expect the caller to handle graceful shutdown and signal handling. +func wrapCPUProfile(svrCtx *Context, callbackFn func() error) error { + if cpuProfile := svrCtx.Viper.GetString(flagCPUProfile); cpuProfile != "" { + f, err := os.Create(cpuProfile) + if err != nil { + return err + } + + svrCtx.Logger.Info("starting CPU profiler", "profile", cpuProfile) + + if err := pprof.StartCPUProfile(f); err != nil { + return err + } + + defer func() { + svrCtx.Logger.Info("stopping CPU profiler", "profile", cpuProfile) + pprof.StopCPUProfile() + + if err := f.Close(); err != nil { + svrCtx.Logger.Info("failed to close cpu-profile file", "profile", cpuProfile, "err", err.Error()) + } + }() + } + errCh := make(chan error) go func() { - if err := apiSrv.Start(svrCfg); err != nil { - errCh <- err - } + errCh <- callbackFn() }() - select { - case err := <-errCh: - return nil, err + return <-errCh +} + +// emitServerInfoMetrics emits server info related metrics using application telemetry. +func emitServerInfoMetrics() { + var ls []metrics.Label + + versionInfo := version.NewInfo() + if len(versionInfo.GoVersion) > 0 { + ls = append(ls, telemetry.NewLabel("go", versionInfo.GoVersion)) + } + if len(versionInfo.LbmSdkVersion) > 0 { + ls = append(ls, telemetry.NewLabel("version", versionInfo.LbmSdkVersion)) + } - case <-time.After(types.ServerStartTime): // assume server started successfully + if len(ls) == 0 { + return } - return apiSrv, nil + telemetry.SetGaugeWithLabels([]string{"server", "info"}, 1, ls) } -func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { - if !cfg.Telemetry.Enabled { - return nil, nil - } - return telemetry.New(cfg.Telemetry) +func getCtx(svrCtx *Context, block bool) (*errgroup.Group, context.Context) { + ctx, cancelFn := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + // listen for quit signals so the calling parent process can gracefully exit + ListenForQuitSignals(g, block, cancelFn, svrCtx.Logger) + return g, ctx } diff --git a/server/types/app.go b/server/types/app.go index 397d2c1030..0e2ca5dc39 100644 --- a/server/types/app.go +++ b/server/types/app.go @@ -3,7 +3,6 @@ package types import ( "encoding/json" "io" - "time" "github.com/gogo/protobuf/grpc" "github.com/spf13/cobra" @@ -18,10 +17,6 @@ import ( sdk "github.com/Finschia/finschia-sdk/types" ) -// ServerStartTime defines the time duration that the server need to stay running after startup -// for the startup be considered successful -const ServerStartTime = 5 * time.Second - type ( // AppOptions defines an interface that is passed into an application // constructor, typically used to set BaseApp options that are either supplied diff --git a/server/util.go b/server/util.go index 78fa9fb670..735b994a81 100644 --- a/server/util.go +++ b/server/util.go @@ -1,6 +1,7 @@ package server import ( + "context" "errors" "fmt" "io" @@ -23,6 +24,7 @@ import ( tmcfg "github.com/tendermint/tendermint/config" tmlog "github.com/tendermint/tendermint/libs/log" dbm "github.com/tendermint/tm-db" + "golang.org/x/sync/errgroup" "github.com/Finschia/finschia-sdk/client/flags" "github.com/Finschia/finschia-sdk/server/config" @@ -349,12 +351,31 @@ func TrapSignal(cleanupFunc func()) { }() } -// WaitForQuitSignals waits for SIGINT and SIGTERM and returns. -func WaitForQuitSignals() ErrorCode { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - sig := <-sigs - return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} +// ListenForQuitSignals listens for SIGINT and SIGTERM. When a signal is received, +// the cleanup function is called, indicating the caller can gracefully exit or +// return. +// +// Note, the blocking behavior of this depends on the block argument. +// The caller must ensure the corresponding context derived from the cancelFn is used correctly. +func ListenForQuitSignals(g *errgroup.Group, block bool, cancelFn context.CancelFunc, logger tmlog.Logger) { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + f := func() { + sig := <-sigCh + cancelFn() + + logger.Info("caught signal", "signal", sig.String()) + } + + if block { + g.Go(func() error { + f() + return nil + }) + } else { + go f() + } } func skipInterface(iface net.Interface) bool { @@ -386,7 +407,7 @@ func openDB(rootDir string) (dbm.DB, error) { return sdk.NewLevelDB("application", dataDir) } -func openTraceWriter(traceWriterFile string) (w io.Writer, err error) { +func openTraceWriter(traceWriterFile string) (w io.WriteCloser, err error) { if traceWriterFile == "" { return } diff --git a/testutil/network/network.go b/testutil/network/network.go index cc2e8294b0..183f984a91 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -21,6 +21,7 @@ import ( "github.com/tendermint/tendermint/node" tmclient "github.com/tendermint/tendermint/rpc/client" dbm "github.com/tendermint/tm-db" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/Finschia/finschia-sdk/baseapp" @@ -159,9 +160,12 @@ type ( ValAddress sdk.ValAddress RPCClient tmclient.Client - tmNode *node.Node - api *api.Server - grpc *grpc.Server + app servertypes.Application + tmNode *node.Node + api *api.Server + grpc *grpc.Server + errGroup *errgroup.Group + cancelFn context.CancelFunc } ) diff --git a/testutil/network/util.go b/testutil/network/util.go index 6bd31cde42..020051ad58 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -1,9 +1,9 @@ package network import ( + "context" "encoding/json" "path/filepath" - "time" tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/node" @@ -13,6 +13,7 @@ import ( "github.com/tendermint/tendermint/rpc/client/local" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" + "golang.org/x/sync/errgroup" "github.com/Finschia/finschia-sdk/server/api" servergrpc "github.com/Finschia/finschia-sdk/server/grpc" @@ -38,6 +39,7 @@ func startInProcess(cfg Config, val *Validator) error { } app := cfg.AppConstructor(*val) + val.app = app genDocProvider := node.DefaultGenesisDocProviderFunc(tmCfg) pv := pvm.LoadOrGenFilePV(tmCfg.PrivValidatorKeyFile(), tmCfg.PrivValidatorStateFile()) @@ -78,12 +80,23 @@ func startInProcess(cfg Config, val *Validator) error { } } - if val.AppConfig.GRPC.Enable { - grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC) + ctx := context.Background() + ctx, val.cancelFn = context.WithCancel(ctx) + val.errGroup, ctx = errgroup.WithContext(ctx) + grpcCfg := val.AppConfig.GRPC + + if grpcCfg.Enable { + grpcSrv, err := servergrpc.NewGRPCServer(val.ClientCtx, app, grpcCfg) if err != nil { return err } + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + val.errGroup.Go(func() error { + return servergrpc.StartGRPCServer(ctx, logger.With("module", "grpc-server"), grpcCfg, grpcSrv) + }) + val.grpc = grpcSrv } @@ -91,19 +104,9 @@ func startInProcess(cfg Config, val *Validator) error { apiSrv := api.New(val.ClientCtx, logger.With("module", "api-server")) app.RegisterAPIRoutes(apiSrv, val.AppConfig.API) - errCh := make(chan error) - - go func() { - if err := apiSrv.Start(*val.AppConfig); err != nil { - errCh <- err - } - }() - - select { - case err := <-errCh: - return err - case <-time.After(srvtypes.ServerStartTime): // assume server started successfully - } + val.errGroup.Go(func() error { + return apiSrv.Start(ctx, *val.AppConfig) + }) val.api = apiSrv }