diff --git a/cmd/components/data_coord.go b/cmd/components/data_coord.go index f7878314739f3..977a52a42dece 100644 --- a/cmd/components/data_coord.go +++ b/cmd/components/data_coord.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -57,10 +59,8 @@ func (s *DataCoord) Run() error { // Stop terminates service func (s *DataCoord) Stop() error { - if err := s.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().DataCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(s.svr.Stop, timeout) } // GetComponentStates returns DataCoord's states diff --git a/cmd/components/data_node.go b/cmd/components/data_node.go index 25a7b9a91c37c..8fbba83a0800d 100644 --- a/cmd/components/data_node.go +++ b/cmd/components/data_node.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -60,10 +62,8 @@ func (d *DataNode) Run() error { // Stop terminates service func (d *DataNode) Stop() error { - if err := d.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().DataNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(d.svr.Stop, timeout) } // GetComponentStates returns DataNode's states diff --git a/cmd/components/index_node.go b/cmd/components/index_node.go index 4f947d35f4158..edf72384d4d2d 100644 --- a/cmd/components/index_node.go +++ b/cmd/components/index_node.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -58,10 +60,8 @@ func (n *IndexNode) Run() error { // Stop terminates service func (n *IndexNode) Stop() error { - if err := n.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().IndexNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(n.svr.Stop, timeout) } // GetComponentStates returns IndexNode's states diff --git a/cmd/components/proxy.go b/cmd/components/proxy.go index 61a62df495538..cb74b36680a90 100644 --- a/cmd/components/proxy.go +++ b/cmd/components/proxy.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -59,10 +61,8 @@ func (n *Proxy) Run() error { // Stop terminates service func (n *Proxy) Stop() error { - if err := n.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().ProxyCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(n.svr.Stop, timeout) } // GetComponentStates returns Proxy's states diff --git a/cmd/components/query_coord.go b/cmd/components/query_coord.go index 3c893ad697631..c98812d86ef62 100644 --- a/cmd/components/query_coord.go +++ b/cmd/components/query_coord.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -60,10 +62,8 @@ func (qs *QueryCoord) Run() error { // Stop terminates service func (qs *QueryCoord) Stop() error { - if err := qs.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().QueryCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(qs.svr.Stop, timeout) } // GetComponentStates returns QueryCoord's states diff --git a/cmd/components/query_node.go b/cmd/components/query_node.go index 50570ec152fe4..3857f81bafa49 100644 --- a/cmd/components/query_node.go +++ b/cmd/components/query_node.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -60,10 +62,8 @@ func (q *QueryNode) Run() error { // Stop terminates service func (q *QueryNode) Stop() error { - if err := q.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(q.svr.Stop, timeout) } // GetComponentStates returns QueryNode's states diff --git a/cmd/components/root_coord.go b/cmd/components/root_coord.go index 720511902a911..e130516ac8d16 100644 --- a/cmd/components/root_coord.go +++ b/cmd/components/root_coord.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( rc "github.com/milvus-io/milvus/internal/distributed/rootcoord" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -59,10 +61,8 @@ func (rc *RootCoord) Run() error { // Stop terminates service func (rc *RootCoord) Stop() error { - if rc.svr != nil { - return rc.svr.Stop() - } - return nil + timeout := paramtable.Get().RootCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(rc.svr.Stop, timeout) } // GetComponentStates returns RootCoord's states diff --git a/cmd/components/util.go b/cmd/components/util.go new file mode 100644 index 0000000000000..d731bb6e86f5f --- /dev/null +++ b/cmd/components/util.go @@ -0,0 +1,38 @@ +package components + +import ( + "context" + "os" + "time" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/util/conc" +) + +var errStopTimeout = errors.New("stop timeout") + +// exitWhenStopTimeout stops a component with timeout and exit progress when timeout. +func exitWhenStopTimeout(stop func() error, timeout time.Duration) error { + err := stopWithTimeout(stop, timeout) + if errors.Is(err, errStopTimeout) { + os.Exit(1) + } + return err +} + +// stopWithTimeout stops a component with timeout. +func stopWithTimeout(stop func() error, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + future := conc.Go(func() (struct{}, error) { + return struct{}{}, stop() + }) + select { + case <-future.Inner(): + return errors.Wrap(future.Err(), "failed to stop component") + case <-ctx.Done(): + return errStopTimeout + } +} diff --git a/cmd/components/util_test.go b/cmd/components/util_test.go new file mode 100644 index 0000000000000..4490b20c8d94e --- /dev/null +++ b/cmd/components/util_test.go @@ -0,0 +1,38 @@ +package components + +import ( + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" +) + +func TestExitWithTimeout(t *testing.T) { + // only normal path can be tested. + targetErr := errors.New("stop error") + err := exitWhenStopTimeout(func() error { + time.Sleep(1 * time.Second) + return targetErr + }, 5*time.Second) + assert.ErrorIs(t, err, targetErr) +} + +func TestStopWithTimeout(t *testing.T) { + ch := make(chan struct{}) + stop := func() error { + <-ch + return nil + } + + err := stopWithTimeout(stop, 1*time.Second) + assert.ErrorIs(t, err, errStopTimeout) + + targetErr := errors.New("stop error") + stop = func() error { + return targetErr + } + + err = stopWithTimeout(stop, 1*time.Second) + assert.ErrorIs(t, err, targetErr) +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 79831d918a9aa..d73356db92456 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -269,16 +269,7 @@ func (s *Server) Register() error { s.session.LivenessCheck(s.serverLoopCtx, func() { logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID())) - if err := s.Stop(); err != nil { - logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Dec() - // manually send signal to starter goroutine - if s.session.IsTriggerKill() { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil } @@ -1050,15 +1041,23 @@ func (s *Server) Stop() error { return nil } logutil.Logger(s.ctx).Info("server shutdown") - s.cluster.Close() s.garbageCollector.close() - s.stopServerLoop() + logutil.Logger(s.ctx).Info("datacoord garbage collector stopped") if Params.DataCoordCfg.EnableCompaction.GetAsBool() { s.stopCompactionTrigger() s.stopCompactionHandler() } + logutil.Logger(s.ctx).Info("datacoord compaction stopped") + s.indexBuilder.Stop() + logutil.Logger(s.ctx).Info("datacoord index builder stopped") + + s.cluster.Close() + logutil.Logger(s.ctx).Info("index builder stopped") + + s.stopServerLoop() + logutil.Logger(s.ctx).Info("serverloop stopped") if s.session != nil { s.session.Stop() @@ -1067,6 +1066,7 @@ func (s *Server) Stop() error { if s.icSession != nil { s.icSession.Stop() } + logutil.Logger(s.ctx).Warn("datacoord stop successful") return nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ede9742d81685..20e1e4640d163 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -27,7 +27,6 @@ import ( "os" "sync" "sync/atomic" - "syscall" "time" "github.com/cockroachdb/errors" @@ -188,16 +187,7 @@ func (node *DataNode) Register() error { // Start liveness check node.session.LivenessCheck(node.ctx, func() { log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.GetSession().ServerID)) - if err := node.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataNodeRole).Dec() - // manually send signal to starter goroutine - if node.session.TriggerKill { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index fc8d21ecd094f..b94c81f291165 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -218,10 +218,14 @@ func (s *Server) start() error { // Stop stops the DataCoord server gracefully. // Need to call the GracefulStop interface of grpc server and call the stop method of the inner DataCoord object. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().DataCoordGrpcServerCfg - log.Debug("Datacoord stop", zap.String("Address", Params.GetAddress())) - var err error + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("Datacoord stopping") + defer func() { + logger.Info("Datacoord stopped", zap.Error(err)) + }() + s.cancel() if s.etcdCli != nil { diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index a7e549e7f09ca..c2c39116780da 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -198,9 +198,14 @@ func (s *Server) Run() error { } // Stop stops Datanode's grpc service. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().DataNodeGrpcServerCfg - log.Debug("Datanode stop", zap.String("Address", Params.GetAddress())) + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("Datanode stopping") + defer func() { + logger.Info("Datanode stopped", zap.Error(err)) + }() + s.cancel() if s.etcdCli != nil { defer s.etcdCli.Close() @@ -209,7 +214,7 @@ func (s *Server) Stop() error { utils.GracefulStopGRPCServer(s.grpcServer) } - err := s.datanode.Stop() + err = s.datanode.Stop() if err != nil { return err } diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 10b3b8ac02a09..af1b200cbd567 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -208,9 +208,14 @@ func (s *Server) start() error { } // Stop stops IndexNode's grpc service. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().IndexNodeGrpcServerCfg - log.Debug("IndexNode stop", zap.String("Address", Params.GetAddress())) + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("IndexNode stopping") + defer func() { + logger.Info("IndexNode stopped", zap.Error(err)) + }() + if s.indexnode != nil { s.indexnode.Stop() } diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index e0a2ab1a00b34..0059e684c99e8 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -662,9 +662,13 @@ func (s *Server) start() error { } // Stop stop the Proxy Server -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().ProxyGrpcServerCfg - log.Debug("Proxy stop", zap.String("internal address", Params.GetInternalAddress()), zap.String("external address", Params.GetInternalAddress())) + logger := log.With(zap.String("internal address", Params.GetInternalAddress()), zap.String("external address", Params.GetInternalAddress())) + logger.Info("Proxy stopping") + defer func() { + logger.Info("Proxy stopped", zap.Error(err)) + }() if s.etcdCli != nil { defer s.etcdCli.Close() @@ -708,7 +712,7 @@ func (s *Server) Stop() error { s.wg.Wait() - err := s.proxy.Stop() + err = s.proxy.Stop() if err != nil { return err } diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 30271af103d6b..bf65d80a283b3 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -269,9 +269,14 @@ func (s *Server) start() error { } // Stop stops QueryCoord's grpc service. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().QueryCoordGrpcServerCfg - log.Debug("QueryCoord stop", zap.String("Address", Params.GetAddress())) + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("QueryCoord stopping") + defer func() { + logger.Info("QueryCoord stopped", zap.Error(err)) + }() + if s.etcdCli != nil { defer s.etcdCli.Close() } @@ -279,9 +284,7 @@ func (s *Server) Stop() error { if s.grpcServer != nil { utils.GracefulStopGRPCServer(s.grpcServer) } - err := s.queryCoord.Stop() - - return err + return s.queryCoord.Stop() } // SetRootCoord sets root coordinator's client diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index a94c68d221943..2ad14f15d2907 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -233,10 +233,15 @@ func (s *Server) Run() error { } // Stop stops QueryNode's grpc service. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().QueryNodeGrpcServerCfg - log.Debug("QueryNode stop", zap.String("Address", Params.GetAddress())) - err := s.querynode.Stop() + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("QueryNode stopping") + defer func() { + logger.Info("QueryNode stopped", zap.Error(err)) + }() + + err = s.querynode.Stop() if err != nil { return err } diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 7d8bb0af49c41..49d12e227fe79 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -312,9 +312,14 @@ func (s *Server) start() error { return nil } -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().RootCoordGrpcServerCfg - log.Debug("Rootcoord stop", zap.String("Address", Params.GetAddress())) + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("Rootcoord stopping") + defer func() { + logger.Info("Rootcoord stopped", zap.Error(err)) + }() + if s.etcdCli != nil { defer s.etcdCli.Close() } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index dcf9804a8cea6..504f1ce137174 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -35,7 +35,6 @@ import ( "path" "path/filepath" "sync" - "syscall" "time" "unsafe" @@ -139,16 +138,7 @@ func (i *IndexNode) Register() error { // start liveness check i.session.LivenessCheck(i.loopCtx, func() { log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) - if err := i.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.IndexNodeRole).Dec() - // manually send signal to starter goroutine - if i.session.TriggerKill { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 77ee7d8368d26..afd8bd5fe4c7b 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -23,7 +23,6 @@ import ( "os" "strconv" "sync" - "syscall" "time" "github.com/cockroachdb/errors" @@ -166,15 +165,7 @@ func (node *Proxy) Register() error { log.Info("Proxy Register Finished") node.session.LivenessCheck(node.ctx, func() { log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) - if err := node.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.ProxyRole).Dec() - if node.session.TriggerKill { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) // TODO Reset the logger // Params.initLogCfg() diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8f9259c2aa47f..a8ab9f136c5e3 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -151,16 +151,7 @@ func (s *Server) Register() error { metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc() s.session.LivenessCheck(s.ctx, func() { log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID())) - if err := s.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Dec() - // manually send signal to starter goroutine - if s.session.IsTriggerKill() { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index c7fbfe56a2746..f7ce1e578cb8a 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -37,7 +37,6 @@ import ( "runtime/debug" "strings" "sync" - "syscall" "time" "unsafe" @@ -168,16 +167,7 @@ func (node *QueryNode) Register() error { metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryNodeRole).Inc() node.session.LivenessCheck(node.ctx, func() { log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", paramtable.GetNodeID())) - if err := node.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryNodeRole).Dec() - // manually send signal to starter goroutine - if node.session.IsTriggerKill() { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil } @@ -418,9 +408,7 @@ func (node *QueryNode) Stop() error { log.Warn("session fail to go stopping state", zap.Error(err)) } else { metrics.StoppingBalanceNodeNum.WithLabelValues().Set(1) - timeoutCh := time.After(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)) - outer: for (node.manager != nil && !node.manager.Segment.Empty()) || (node.pipelineManager != nil && node.pipelineManager.Num() != 0) { var ( @@ -436,25 +424,22 @@ func (node *QueryNode) Stop() error { channelNum = node.pipelineManager.Num() } - select { - case <-timeoutCh: - log.Warn("migrate data timed out", zap.Int64("ServerID", paramtable.GetNodeID()), - zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 { - return s.ID() - })), - zap.Int64s("growingSegments", lo.Map(growingSegments, func(t segments.Segment, i int) int64 { - return t.ID() - })), - zap.Int("channelNum", channelNum), - ) - break outer - - case <-time.After(time.Second): - metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(len(sealedSegments))) - metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(channelNum)) - } + log.Info("migrate data...", zap.Int64("ServerID", paramtable.GetNodeID()), + zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 { + return s.ID() + })), + zap.Int64s("growingSegments", lo.Map(growingSegments, func(t segments.Segment, i int) int64 { + return t.ID() + })), + zap.Int("channelNum", channelNum), + ) + metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(len(sealedSegments))) + metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(channelNum)) + // Metrics is collected every 15 seconds or more. + <-time.After(5 * time.Second) } + log.Info("query node is empty, ready to stop", zap.Int64("ServerID", paramtable.GetNodeID())) metrics.StoppingBalanceNodeNum.WithLabelValues().Set(0) metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0) metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0) diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index b6c8ed0409863..c7e589ab0d743 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -214,8 +215,6 @@ func (suite *QueryNodeSuite) TestInit_QueryHook() { } func (suite *QueryNodeSuite) TestStop() { - paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "2") - suite.node.manager = segments.NewManager() mockSession := sessionutil.NewMockSession(suite.T()) mockSession.EXPECT().GoingStop().Return(nil) @@ -227,7 +226,20 @@ func (suite *QueryNodeSuite) TestStop() { segment, err := segments.NewSegment(context.Background(), collection, 100, 10, 1, "test_stop_channel", segments.SegmentTypeSealed, 1, nil, nil) suite.NoError(err) suite.node.manager.Segment.Put(segments.SegmentTypeSealed, segment) - err = suite.node.Stop() + future := conc.Go(func() (struct{}, error) { + return struct{}{}, suite.node.Stop() + }) + // Graceful stop, should wait for all segments to be released + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + select { + case <-future.Inner(): + suite.FailNow("stop should be blocked") + case <-ctx.Done(): + } + suite.node.manager.Segment.Clear() + + _, err = future.Await() suite.NoError(err) suite.True(suite.node.manager.Segment.Empty()) } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index d9fc66aa6a566..b0c0c58b5ed2e 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -22,7 +22,6 @@ import ( "math/rand" "os" "sync" - "syscall" "time" "github.com/cockroachdb/errors" @@ -280,16 +279,7 @@ func (c *Core) Register() error { log.Info("RootCoord Register Finished") c.session.LivenessCheck(c.ctx, func() { log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) - if err := c.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Dec() - // manually send signal to starter goroutine - if c.session.TriggerKill { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index be093f9e8a371..66f311b269ee6 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -34,7 +34,9 @@ const ( DefaultIndexSliceSize = 16 DefaultConsistencyLevelUsedInDelete = commonpb.ConsistencyLevel_Bounded DefaultGracefulTime = 5000 // ms - DefaultGracefulStopTimeout = 1800 // s + DefaultGracefulStopTimeout = 1800 // s, for node + DefaultProxyGracefulStopTimeout = 30 // s,for proxy + DefaultCoordGracefulStopTimeout = 5 // s,for coord DefaultHighPriorityThreadCoreCoefficient = 10 DefaultMiddlePriorityThreadCoreCoefficient = 5 DefaultLowPriorityThreadCoreCoefficient = 1 @@ -819,6 +821,7 @@ type rootCoordConfig struct { EnableActiveStandby ParamItem `refreshable:"false"` MaxDatabaseNum ParamItem `refreshable:"false"` MaxGeneralCapacity ParamItem `refreshable:"true"` + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *rootCoordConfig) init(base *BaseTable) { @@ -913,6 +916,15 @@ func (p *rootCoordConfig) init(base *BaseTable) { }, } p.MaxGeneralCapacity.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "rootCoord.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultCoordGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -959,6 +971,8 @@ type proxyConfig struct { PartitionNameRegexp ParamItem `refreshable:"true"` AccessLog AccessLogConfig + + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *proxyConfig) init(base *BaseTable) { @@ -1251,6 +1265,15 @@ please adjust in embedded Milvus: false`, Doc: "switch for whether proxy shall use partition name as regexp when searching", } p.PartitionNameRegexp.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "proxy.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultProxyGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -1314,6 +1337,7 @@ type queryCoordConfig struct { ObserverTaskParallel ParamItem `refreshable:"false"` CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` CheckNodeSessionInterval ParamItem `refreshable:"false"` + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1702,6 +1726,15 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: true, } p.HeartBeatWarningLag.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "queryCoord.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultCoordGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -2243,6 +2276,8 @@ type dataCoordConfig struct { // auto balance channel on datanode AutoBalance ParamItem `refreshable:"true"` CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` + + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -2637,6 +2672,15 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: true, } p.AutoUpgradeSegmentIndex.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "dataCoord.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultCoordGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -2687,6 +2731,8 @@ type dataNodeConfig struct { ChannelWorkPoolSize ParamItem `refreshable:"true"` UpdateChannelCheckpointMaxParallel ParamItem `refreshable:"true"` + + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *dataNodeConfig) init(base *BaseTable) { @@ -2903,6 +2949,15 @@ func (p *dataNodeConfig) init(base *BaseTable) { DefaultValue: "1000", } p.UpdateChannelCheckpointMaxParallel.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "datanode.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -2914,7 +2969,7 @@ type indexNodeConfig struct { DiskCapacityLimit ParamItem `refreshable:"true"` MaxDiskUsagePercentage ParamItem `refreshable:"true"` - GracefulStopTimeout ParamItem `refreshable:"false"` + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *indexNodeConfig) init(base *BaseTable) { @@ -2969,6 +3024,7 @@ func (p *indexNodeConfig) init(base *BaseTable) { Key: "indexNode.gracefulStopTimeout", Version: "2.2.1", FallbackKeys: []string{"common.gracefulStopTimeout"}, + Doc: "seconds. force stop node without graceful stop", Export: true, } p.GracefulStopTimeout.Init(base.mgr) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index e5eb4fe0d6783..e6506624e96ad 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -125,6 +125,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false) t.Logf("rootCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool()) + params.Save("rootCoord.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) + SetCreateTime(time.Now()) SetUpdateTime(time.Now()) }) @@ -167,6 +170,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, Params.CostMetricsExpireTime.GetAsInt(), 1000) assert.Equal(t, Params.RetryTimesOnReplica.GetAsInt(), 2) assert.EqualValues(t, Params.HealthCheckTimeout.GetAsInt64(), 3000) + + params.Save("proxy.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) // t.Run("test proxyConfig panic", func(t *testing.T) { @@ -285,6 +291,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.AutoBalance.GetAsBool()) assert.Equal(t, true, Params.AutoBalanceChannel.GetAsBool()) assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) + + params.Save("queryCoord.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("test queryNodeConfig", func(t *testing.T) { @@ -350,6 +359,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(100), gracefulStopTimeout.GetAsInt64()) assert.Equal(t, false, Params.EnableWorkerSQCostMetrics.GetAsBool()) + + params.Save("querynode.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("test dataCoordConfig", func(t *testing.T) { @@ -362,6 +374,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.AutoBalance.GetAsBool()) assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) assert.Equal(t, false, Params.AutoUpgradeSegmentIndex.GetAsBool()) + + params.Save("datacoord.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("test dataNodeConfig", func(t *testing.T) { @@ -408,12 +423,18 @@ func TestComponentParam(t *testing.T) { updateChannelCheckpointMaxParallel := Params.UpdateChannelCheckpointMaxParallel.GetAsInt() t.Logf("updateChannelCheckpointMaxParallel: %d", updateChannelCheckpointMaxParallel) assert.Equal(t, 1000, Params.UpdateChannelCheckpointMaxParallel.GetAsInt()) + + params.Save("datanode.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("test indexNodeConfig", func(t *testing.T) { Params := ¶ms.IndexNodeCfg params.Save(Params.GracefulStopTimeout.Key, "50") assert.Equal(t, Params.GracefulStopTimeout.GetAsInt64(), int64(50)) + + params.Save("indexnode.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("channel config priority", func(t *testing.T) {