diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index 35040094aeb5f..70707d64eddb6 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -150,7 +150,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableIndexNode = true case typeutil.StreamingNodeRole: streamingutil.MustEnableStreamingService() + streamingutil.EnableEmbededQueryNode() role.EnableStreamingNode = true + role.EnableQueryNode = true case typeutil.StandaloneRole, typeutil.EmbeddedRole: role.EnableRootCoord = true role.EnableProxy = true @@ -175,6 +177,10 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableIndexNode = enableIndexNode role.EnableProxy = enableProxy role.EnableStreamingNode = enableStreamingNode + if enableStreamingNode && !enableQueryNode { + role.EnableQueryNode = true + streamingutil.EnableEmbededQueryNode() + } default: fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp()) os.Exit(-1) diff --git a/internal/coordinator/snmanager/streaming_node_manager.go b/internal/coordinator/snmanager/streaming_node_manager.go new file mode 100644 index 0000000000000..e0e09a9d59069 --- /dev/null +++ b/internal/coordinator/snmanager/streaming_node_manager.go @@ -0,0 +1,106 @@ +package snmanager + +import ( + "context" + "sync" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var StaticStreamingNodeManager = newStreamingNodeManager() + +func newStreamingNodeManager() *StreamingNodeManager { + snm := &StreamingNodeManager{ + notifier: syncutil.NewAsyncTaskNotifier[struct{}](), + balancer: syncutil.NewFuture[balancer.Balancer](), + cond: syncutil.NewContextCond(&sync.Mutex{}), + latestAssignments: make(map[string]types.PChannelInfoAssigned), + streamingNodes: typeutil.NewUniqueSet(), + nodeChangedNotifier: syncutil.NewVersionedNotifier(), + } + go snm.execute() + return snm +} + +// StreamingNodeManager is a manager for manage the querynode that embeded into streaming node. +// StreamingNodeManager is exclusive with ResourceManager. +type StreamingNodeManager struct { + notifier *syncutil.AsyncTaskNotifier[struct{}] + balancer *syncutil.Future[balancer.Balancer] + // The coord is merged after 2.6, so we don't need to make distribution safe. + cond *syncutil.ContextCond + latestAssignments map[string]types.PChannelInfoAssigned // The latest assignments info got from streaming coord balance module. + streamingNodes typeutil.UniqueSet + nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node in streaming node manager has been changed. +} + +// GetWALLocated returns the server id of the node that the wal of the vChannel is located. +func (s *StreamingNodeManager) GetWALLocated(vChannel string) int64 { + pchannel := funcutil.ToPhysicalChannel(vChannel) + var targetServerID int64 + + s.cond.L.Lock() + for { + if assignment, ok := s.latestAssignments[pchannel]; ok { + targetServerID = assignment.Node.ServerID + break + } + s.cond.Wait(context.Background()) + } + s.cond.L.Unlock() + return targetServerID +} + +// GetStreamingQueryNodeIDs returns the server ids of the streaming query nodes. +func (s *StreamingNodeManager) GetStreamingQueryNodeIDs() typeutil.UniqueSet { + s.cond.L.Lock() + defer s.cond.L.Unlock() + return s.streamingNodes.Clone() +} + +// ListenNodeChanged returns a listener for node changed event. +func (s *StreamingNodeManager) ListenNodeChanged() *syncutil.VersionedListener { + return s.nodeChangedNotifier.Listen(syncutil.VersionedListenAtEarliest) +} + +// SetBalancerReady set the balancer ready for the streaming node manager from streamingcoord initialization. +func (s *StreamingNodeManager) SetBalancerReady(b balancer.Balancer) { + s.balancer.Set(b) +} + +func (s *StreamingNodeManager) execute() (err error) { + defer s.notifier.Finish(struct{}{}) + + balancer, err := s.balancer.GetWithContext(s.notifier.Context()) + if err != nil { + return errors.Wrap(err, "failed to wait balancer ready") + } + for { + if err := balancer.WatchChannelAssignments(s.notifier.Context(), func( + version typeutil.VersionInt64Pair, + relations []types.PChannelInfoAssigned, + ) error { + s.cond.LockAndBroadcast() + s.latestAssignments = make(map[string]types.PChannelInfoAssigned) + s.streamingNodes = typeutil.NewUniqueSet() + for _, relation := range relations { + s.latestAssignments[relation.Channel.Name] = relation + s.streamingNodes.Insert(relation.Node.ServerID) + } + s.nodeChangedNotifier.NotifyAll() + log.Info("streaming node manager updated", zap.Any("assignments", s.latestAssignments), zap.Any("streamingNodes", s.streamingNodes)) + s.cond.L.Unlock() + return nil + }); err != nil { + return err + } + } +} diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index f05a07dc7b028..05d019f52ec82 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -688,11 +688,18 @@ message ChannelNodeInfo { message Replica { int64 ID = 1; int64 collectionID = 2; + // nodes and ro_nodes can only load sealed segment. + // only manage the legacy querynode that not embedded in the streamingnode. repeated int64 nodes = 3; // all (read and write) nodes. mutual exclusive with ro_nodes. string resource_group = 4; repeated int64 ro_nodes = 5; // the in-using node but should not be assigned to these replica. - // can not load new channel or segment on it anymore. - map channel_node_infos = 6; + // cannot load segment on it anymore. + map channel_node_infos = 6; + // rw_sq_nodes and ro_sq_nodes can only watch channel and assign segment, will be removed in 3.0. + // only manage the querynode embedded in the streamingnode. + repeated int64 rw_sq_nodes = 7; // all (read and write) nodes. mutual exclusive with ro_sq_nodes. + repeated int64 ro_sq_nodes = 8; // the in-using node but should not be assigned to these replica. + // cannot watch channel on it anymore. } enum SyncType { diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index e3fd1e5121384..49197f194cff0 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -119,22 +119,25 @@ func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int if len(nodesInfo) == 0 { return nil } + + channels, plans, scoreDelta := assignChannelToWALLocatedFirstForNodeInfo(channels, nodesInfo) + sort.Slice(nodesInfo, func(i, j int) bool { cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt() id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID() - delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1) + delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1)-scoreDelta[id1], b.scheduler.GetChannelTaskDelta(id2, -1)-scoreDelta[id2] return cnt1+delta1 < cnt2+delta2 }) - ret := make([]ChannelAssignPlan, 0, len(channels)) + for i, c := range channels { plan := ChannelAssignPlan{ Channel: c, From: -1, To: nodesInfo[i%len(nodesInfo)].ID(), } - ret = append(ret, plan) + plans = append(plans, plan) } - return ret + return plans } func (b *RoundRobinBalancer) BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index 92d12f0b1f20f..ddc1d95981337 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -81,6 +81,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica return b.ScoreBasedBalancer.BalanceReplica(ctx, replica) } + // TODO: assign by channel channelPlans = make([]ChannelAssignPlan, 0) segmentPlans = make([]SegmentAssignPlan, 0) for channelName := range channels { diff --git a/internal/querycoordv2/balance/multi_target_balance.go b/internal/querycoordv2/balance/multi_target_balance.go index ca078d5992743..cc60d39b9683e 100644 --- a/internal/querycoordv2/balance/multi_target_balance.go +++ b/internal/querycoordv2/balance/multi_target_balance.go @@ -485,24 +485,48 @@ func (b *MultiTargetBalancer) BalanceReplica(ctx context.Context, replica *meta. } }() - if replica.NodesCount() == 0 { - return nil, nil + stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() + + channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance) + if len(channelPlans) != 0 { + segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance) + } + return +} + +func (b *MultiTargetBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan { + rwNodes := replica.GetRWSQNodes() + roNodes := replica.GetROSQNodes() + if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) { + return nil + } + + if len(roNodes) != 0 { + if !stoppingBalance { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) + return nil + } + return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes) } + if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { + return b.genChannelPlan(ctx, br, replica, rwNodes) + } + return nil +} + +func (b *MultiTargetBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan { rwNodes := replica.GetRWNodes() roNodes := replica.GetRONodes() - if len(rwNodes) == 0 { - // no available nodes to balance - return nil, nil + if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) { + return nil } - // print current distribution before generating plans - segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(roNodes) != 0 { - if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + if !stoppingBalance { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) - return nil, nil + return nil } log.Info("Handle stopping nodes", @@ -510,23 +534,9 @@ func (b *MultiTargetBalancer) BalanceReplica(ctx context.Context, replica *meta. zap.Any("available nodes", rwNodes), ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - if b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...) - } - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...) - } - } else { - if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = b.genSegmentPlan(ctx, replica, rwNodes) - } + return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes) } - - return segmentPlans, channelPlans + return b.genSegmentPlan(ctx, replica, rwNodes) } func (b *MultiTargetBalancer) genSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64) []SegmentAssignPlan { diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index d69a676c46c74..bd0889e519b40 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -103,12 +103,14 @@ func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID if len(nodeItems) == 0 { return nil } + + channels, plans := assignChannelToWALLocatedFirst(channels, nodeItems) + queue := newPriorityQueue() for _, item := range nodeItems { queue.push(item) } - plans := make([]ChannelAssignPlan, 0, len(channels)) for _, c := range channels { // pick the node with the least channel num and allocate to it. ni := queue.pop().(*nodeItem) @@ -181,22 +183,49 @@ func (b *RowCountBasedBalancer) BalanceReplica(ctx context.Context, replica *met log.Info("balance plan generated", zap.Stringers("report details", br.records)) } }() - if replica.NodesCount() == 0 { - return nil, nil + + stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() + + channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance) + if len(channelPlans) != 0 { + segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance) + } + return +} + +func (b *RowCountBasedBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan { + rwNodes := replica.GetRWSQNodes() + roNodes := replica.GetROSQNodes() + if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) { + return nil + } + + if len(roNodes) != 0 { + if !stoppingBalance { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) + return nil + } + return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes) } + if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { + return b.genChannelPlan(ctx, br, replica, rwNodes) + } + return nil +} + +func (b *RowCountBasedBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan { rwNodes := replica.GetRWNodes() roNodes := replica.GetRONodes() - if len(rwNodes) == 0 { - // no available nodes to balance - return nil, nil - } - segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) + if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) { + return nil + } + // print current distribution before generating plans if len(roNodes) != 0 { - if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + if !stoppingBalance { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) - return nil, nil + return nil } log.Info("Handle stopping nodes", @@ -204,24 +233,9 @@ func (b *RowCountBasedBalancer) BalanceReplica(ctx context.Context, replica *met zap.Any("available nodes", rwNodes), ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - if b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...) - } - } else { - if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, replica, rwNodes)...) - } + return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes) } - - return segmentPlans, channelPlans + return b.genSegmentPlan(ctx, replica, rwNodes) } func (b *RowCountBasedBalancer) genStoppingSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64, roNodes []int64) []SegmentAssignPlan { diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index f7842a4ce4ce3..e410332b247ba 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -439,52 +439,72 @@ func (b *ScoreBasedBalancer) BalanceReplica(ctx context.Context, replica *meta.R log.Info("balance plan generated", zap.Stringers("nodesInfo", br.NodesInfo()), zap.Stringers("report details", br.records)) } }() + if replica.NodesCount() == 0 { br.AddRecord(StrRecord("replica has no querynode")) return nil, nil } + stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() + + channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance) + if len(channelPlans) != 0 { + segmentPlans = b.balanceSegments(ctx, br, replica, stoppingBalance) + } + return +} + +func (b *ScoreBasedBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan { + rwNodes := replica.GetRWSQNodes() + roNodes := replica.GetROSQNodes() + if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) { + return nil + } + + if len(roNodes) != 0 { + if !stoppingBalance { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) + br.AddRecord(StrRecord("stopping balance is disabled")) + return nil + } + + br.AddRecord(StrRecordf("executing stopping balance: %v", roNodes)) + return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes) + } + + if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { + return b.genChannelPlan(ctx, br, replica, rwNodes) + } + return nil +} + +func (b *ScoreBasedBalancer) balanceSegments(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan { rwNodes := replica.GetRWNodes() roNodes := replica.GetRONodes() if len(rwNodes) == 0 { // no available nodes to balance br.AddRecord(StrRecord("no rwNodes to balance")) - return nil, nil + return nil + } + if !b.permitBalanceSegment(replica.GetCollectionID()) { + return nil } - // print current distribution before generating plans - segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(roNodes) != 0 { - if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + if !stoppingBalance { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) - br.AddRecord(StrRecord("stopping balance is disabled")) - return nil, nil + return nil } log.Info("Handle stopping nodes", zap.Any("stopping nodes", roNodes), zap.Any("available nodes", rwNodes), ) - br.AddRecord(StrRecordf("executing stopping balance: %v", roNodes)) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - if b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...) - } - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...) - } - } else { - if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { - segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, br, replica, rwNodes)...) - } + return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes) } - - return segmentPlans, channelPlans + return b.genSegmentPlan(ctx, br, replica, rwNodes) } func (b *ScoreBasedBalancer) genStoppingSegmentPlan(ctx context.Context, replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan { diff --git a/internal/querycoordv2/balance/streaming_query_node_channel_helper.go b/internal/querycoordv2/balance/streaming_query_node_channel_helper.go new file mode 100644 index 0000000000000..f1771009f4b69 --- /dev/null +++ b/internal/querycoordv2/balance/streaming_query_node_channel_helper.go @@ -0,0 +1,66 @@ +package balance + +import ( + "github.com/milvus-io/milvus/internal/coordinator/snmanager" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/session" +) + +func assignChannelToWALLocatedFirst( + channels []*meta.DmChannel, + nodeItems []*nodeItem, +) (notFoundChannels []*meta.DmChannel, plans []ChannelAssignPlan) { + plans = make([]ChannelAssignPlan, 0) + notFoundChannels = make([]*meta.DmChannel, 0) + for _, c := range channels { + nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName()) + // Check if nodeID is in the list of nodeItems + found := false + for _, item := range nodeItems { + if item.nodeID == nodeID { + item.AddCurrentScoreDelta(-1) + plans = append(plans, ChannelAssignPlan{ + From: -1, + To: item.nodeID, + Channel: c, + }) + found = true + break + } + } + if !found { + notFoundChannels = append(notFoundChannels, c) + } + } + return notFoundChannels, plans +} + +func assignChannelToWALLocatedFirstForNodeInfo( + channels []*meta.DmChannel, + nodeItems []*session.NodeInfo, +) (notFoundChannels []*meta.DmChannel, plans []ChannelAssignPlan, scoreDelta map[int64]int) { + plans = make([]ChannelAssignPlan, 0) + notFoundChannels = make([]*meta.DmChannel, 0) + scoreDelta = make(map[int64]int) + for _, c := range channels { + nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName()) + // Check if nodeID is in the list of nodeItems + found := false + for _, item := range nodeItems { + if item.ID() == nodeID { + plans = append(plans, ChannelAssignPlan{ + From: -1, + To: item.ID(), + Channel: c, + }) + found = true + scoreDelta[item.ID()] += 1 + break + } + } + if !found { + notFoundChannels = append(notFoundChannels, c) + } + } + return notFoundChannels, plans, scoreDelta +} diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 231c1f8dfdf15..1acde66e87def 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -228,11 +228,8 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*meta.DmChannel, replica *meta.Replica) []task.Task { plans := make([]balance.ChannelAssignPlan, 0) for _, ch := range channels { - rwNodes := replica.GetChannelRWNodes(ch.GetChannelName()) - if len(rwNodes) == 0 { - rwNodes = replica.GetRWNodes() - } - plan := c.getBalancerFunc().AssignChannel(ctx, replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, true) + rwSQNodes := replica.GetRWSQNodes() + plan := c.getBalancerFunc().AssignChannel(ctx, replica.GetCollectionID(), []*meta.DmChannel{ch}, rwSQNodes, true) plans = append(plans, plan...) } diff --git a/internal/querycoordv2/meta/replica.go b/internal/querycoordv2/meta/replica.go index 217dfc7144747..d70af0deda69b 100644 --- a/internal/querycoordv2/meta/replica.go +++ b/internal/querycoordv2/meta/replica.go @@ -18,13 +18,23 @@ var NilReplica = newReplica(&querypb.Replica{ // So only read only operations are allowed on these type. type Replica struct { replicaPB *querypb.Replica - rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field. + // Nodes is the legacy querynode that is not embedded in the streamingnode, which can only load sealed segment. + rwNodes typeutil.UniqueSet // a helper field for manipulating replica's Available Nodes slice field. // always keep consistent with replicaPB.Nodes. // mutual exclusive with roNodes. roNodes typeutil.UniqueSet // a helper field for manipulating replica's RO Nodes slice field. // always keep consistent with replicaPB.RoNodes. - // node used by replica but cannot add more channel or segment ont it. + // node used by replica but cannot add segment on it. // include rebalance node or node out of resource group. + + // SQNodes is the querynode that is embedded in the streamingnode, which can only watch channel and load growing segment. + rwSQNodes typeutil.UniqueSet // a helper field for manipulating replica's RW SQ Nodes slice field. + // always keep consistent with replicaPB.RwSqNodes. + // mutable exclusive with roSQNodes. + roSQNodes typeutil.UniqueSet // a helper field for manipulating replica's RO SQ Nodes slice field. + // always keep consistent with replicaPB.RoSqNodes. + // node used by replica but cannot add more channel on it. + // include the rebalance node. } // Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead. @@ -44,6 +54,8 @@ func newReplica(replica *querypb.Replica) *Replica { replicaPB: proto.Clone(replica).(*querypb.Replica), rwNodes: typeutil.NewUniqueSet(replica.Nodes...), roNodes: typeutil.NewUniqueSet(replica.RoNodes...), + rwSQNodes: typeutil.NewUniqueSet(replica.RwSqNodes...), + roSQNodes: typeutil.NewUniqueSet(replica.RoSqNodes...), } } @@ -68,6 +80,8 @@ func (replica *Replica) GetNodes() []int64 { nodes := make([]int64, 0) nodes = append(nodes, replica.replicaPB.GetRoNodes()...) nodes = append(nodes, replica.replicaPB.GetNodes()...) + nodes = append(nodes, replica.replicaPB.GetRwSqNodes()...) + nodes = append(nodes, replica.replicaPB.GetRoSqNodes()...) return nodes } @@ -83,6 +97,18 @@ func (replica *Replica) GetRWNodes() []int64 { return replica.replicaPB.GetNodes() } +// GetROSQNodes returns the ro sq nodes of the replica. +// readonly, don't modify the returned slice. +func (replica *Replica) GetROSQNodes() []int64 { + return replica.replicaPB.GetRoSqNodes() +} + +// GetRWSQNodes returns the rw sq nodes of the replica. +// readonly, don't modify the returned slice. +func (replica *Replica) GetRWSQNodes() []int64 { + return replica.replicaPB.GetRwSqNodes() +} + // RangeOverRWNodes iterates over the read and write nodes of the replica. func (replica *Replica) RangeOverRWNodes(f func(node int64) bool) { replica.rwNodes.Range(f) @@ -93,6 +119,16 @@ func (replica *Replica) RangeOverRONodes(f func(node int64) bool) { replica.roNodes.Range(f) } +// RangeOverRWSQNodes iterates over the read and write streaming query nodes of the replica. +func (replica *Replica) RangeOverRWSQNodes(f func(node int64) bool) { + replica.rwSQNodes.Range(f) +} + +// RangeOverROSQNodes iterates over the ro streaming query nodes of the replica. +func (replica *Replica) RangeOverROSQNodes(f func(node int64) bool) { + replica.roSQNodes.Range(f) +} + // RWNodesCount returns the count of rw nodes of the replica. func (replica *Replica) RWNodesCount() int { return replica.rwNodes.Len() @@ -103,6 +139,16 @@ func (replica *Replica) RONodesCount() int { return replica.roNodes.Len() } +// RWSQNodesCount returns the count of rw nodes of the replica. +func (replica *Replica) RWSQNodesCount() int { + return replica.rwSQNodes.Len() +} + +// ROSQNodesCount returns the count of ro nodes of the replica. +func (replica *Replica) ROSQNodesCount() int { + return replica.roSQNodes.Len() +} + // NodesCount returns the count of rw nodes and ro nodes of the replica. func (replica *Replica) NodesCount() int { return replica.rwNodes.Len() + replica.roNodes.Len() @@ -110,7 +156,7 @@ func (replica *Replica) NodesCount() int { // Contains checks if the node is in rw nodes of the replica. func (replica *Replica) Contains(node int64) bool { - return replica.ContainRONode(node) || replica.ContainRWNode(node) + return replica.ContainRONode(node) || replica.ContainRWNode(node) || replica.ContainSQNode(node) || replica.ContainRWSQNode(node) } // ContainRONode checks if the node is in ro nodes of the replica. @@ -123,6 +169,21 @@ func (replica *Replica) ContainRWNode(node int64) bool { return replica.rwNodes.Contain(node) } +// ContainSQNode checks if the node is in rw sq nodes of the replica. +func (replica *Replica) ContainSQNode(node int64) bool { + return replica.ContainROSQNode(node) || replica.ContainRWSQNode(node) +} + +// ContainRWSQNode checks if the node is in rw sq nodes of the replica. +func (replica *Replica) ContainROSQNode(node int64) bool { + return replica.roSQNodes.Contain(node) +} + +// ContainRWSQNode checks if the node is in rw sq nodes of the replica. +func (replica *Replica) ContainRWSQNode(node int64) bool { + return replica.rwSQNodes.Contain(node) +} + // Deprecated: Warning, break the consistency of ReplicaManager, use `SetAvailableNodesInSameCollectionAndRG` in ReplicaManager instead. // TODO: removed in future, only for old unittest now. func (replica *Replica) AddRWNode(nodes ...int64) { @@ -154,6 +215,8 @@ func (replica *Replica) CopyForWrite() *mutableReplica { replicaPB: proto.Clone(replica.replicaPB).(*querypb.Replica), rwNodes: typeutil.NewUniqueSet(replica.replicaPB.Nodes...), roNodes: typeutil.NewUniqueSet(replica.replicaPB.RoNodes...), + rwSQNodes: typeutil.NewUniqueSet(replica.replicaPB.RwSqNodes...), + roSQNodes: typeutil.NewUniqueSet(replica.replicaPB.RoSqNodes...), }, exclusiveRWNodeToChannel: exclusiveRWNodeToChannel, } @@ -209,6 +272,30 @@ func (replica *mutableReplica) RemoveNode(nodes ...int64) { replica.tryBalanceNodeForChannel() } +// AddRWSQNode adds the node to rw sq nodes of the replica. +func (replica *mutableReplica) AddRWSQNode(nodes ...int64) { + replica.roSQNodes.Remove(nodes...) + replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect() + replica.rwSQNodes.Insert(nodes...) + replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect() +} + +// AddROSQNode add the node to ro sq nodes of the replica. +func (replica *mutableReplica) AddROSQNode(nodes ...int64) { + replica.rwSQNodes.Remove(nodes...) + replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect() + replica.roSQNodes.Insert(nodes...) + replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect() +} + +// RemoveSQNode removes the node from rw sq nodes and ro sq nodes of the replica. +func (replica *mutableReplica) RemoveSQNode(nodes ...int64) { + replica.rwSQNodes.Remove(nodes...) + replica.replicaPB.RwSqNodes = replica.rwSQNodes.Collect() + replica.roSQNodes.Remove(nodes...) + replica.replicaPB.RoSqNodes = replica.roSQNodes.Collect() +} + func (replica *mutableReplica) removeChannelExclusiveNodes(nodes ...int64) { channelNodeMap := make(map[string][]int64) for _, nodeID := range nodes { diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index 2c04c52c7cddc..65fbc75135487 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -496,6 +496,21 @@ func (m *ReplicaManager) RemoveNode(ctx context.Context, replicaID typeutil.Uniq return m.put(ctx, mutableReplica.IntoReplica()) } +// RemoveSQNode removes the sq node from all replicas of given collection. +func (m *ReplicaManager) RemoveSQNode(ctx context.Context, replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + replica, ok := m.replicas[replicaID] + if !ok { + return merr.WrapErrReplicaNotFound(replicaID) + } + + mutableReplica := replica.CopyForWrite() + mutableReplica.RemoveSQNode(nodes...) // ro -> unused + return m.put(ctx, mutableReplica.IntoReplica()) +} + func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, collection typeutil.UniqueID) typeutil.Set[string] { replicas := m.GetByCollection(ctx, collection) ret := typeutil.NewSet(lo.Map(replicas, func(r *Replica, _ int) string { return r.GetResourceGroup() })...) @@ -531,3 +546,48 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string { } return string(ret) } + +// RecoverSQNodesInCollection recovers all sq nodes in collection with latest node list. +// Promise a node will be only assigned to one replica in same collection at same time. +// 1. Move the rw nodes to ro nodes if current replica use too much sqn. +// 2. Add new incoming nodes into the replica if they are not ro node of other replicas in same collection. +// 3. replicas will shared the nodes in resource group fairly. +func (m *ReplicaManager) RecoverSQNodesInCollection(ctx context.Context, collectionID int64, sqnNodeIDs typeutil.UniqueSet) error { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() + + collReplicas, ok := m.coll2Replicas[collectionID] + if !ok { + return errors.Errorf("collection %d not loaded", collectionID) + } + + helper := newReplicaSQNAssignmentHelper(collReplicas.replicas, sqnNodeIDs) + helper.updateExpectedNodeCountForReplicas(len(sqnNodeIDs)) + + modifiedReplicas := make([]*Replica, 0) + // recover node by given sqn node list. + helper.RangeOverReplicas(func(assignment *replicaAssignmentInfo) { + roNodes := assignment.GetNewRONodes() + recoverableNodes, incomingNodeCount := assignment.GetRecoverNodesAndIncomingNodeCount() + // There may be not enough incoming nodes for current replica, + // Even we filtering the nodes that are used by other replica of same collection in other resource group, + // current replica's expected node may be still used by other replica of same collection in same resource group. + incomingNode := helper.AllocateIncomingNodes(incomingNodeCount) + if len(roNodes) == 0 && len(recoverableNodes) == 0 && len(incomingNode) == 0 { + // nothing to do. + return + } + mutableReplica := m.replicas[assignment.GetReplicaID()].CopyForWrite() + mutableReplica.AddROSQNode(roNodes...) // rw -> ro + mutableReplica.AddRWSQNode(recoverableNodes...) // ro -> rw + mutableReplica.AddRWSQNode(incomingNode...) // unused -> rw + log.Info( + "new replica recovery streaming query node found", + zap.Int64("replicaID", assignment.GetReplicaID()), + zap.Int64s("newRONodes", roNodes), + zap.Int64s("roToRWNodes", recoverableNodes), + zap.Int64s("newIncomingNodes", incomingNode)) + modifiedReplicas = append(modifiedReplicas, mutableReplica.IntoReplica()) + }) + return m.put(ctx, modifiedReplicas...) +} diff --git a/internal/querycoordv2/meta/replica_manager_helper.go b/internal/querycoordv2/meta/replica_manager_helper.go index 1bf6f8e8fb90d..be280d09591d5 100644 --- a/internal/querycoordv2/meta/replica_manager_helper.go +++ b/internal/querycoordv2/meta/replica_manager_helper.go @@ -179,6 +179,40 @@ func newReplicaAssignmentInfo(replica *Replica, nodeInRG typeutil.UniqueSet) *re } } +func newReplicaSQNAssignmentInfo(replica *Replica, nodes typeutil.UniqueSet) *replicaAssignmentInfo { + // node in replica can be split into 3 part. + rwNodes := make(typeutil.UniqueSet, replica.RWSQNodesCount()) + newRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount()) + unrecoverableRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount()) + recoverableRONodes := make(typeutil.UniqueSet, replica.ROSQNodesCount()) + + replica.RangeOverRWSQNodes(func(nodeID int64) bool { + if nodes.Contain(nodeID) { + rwNodes.Insert(nodeID) + } else { + newRONodes.Insert(nodeID) + } + return true + }) + + replica.RangeOverROSQNodes(func(nodeID int64) bool { + if nodes.Contain(nodeID) { + recoverableRONodes.Insert(nodeID) + } else { + unrecoverableRONodes.Insert(nodeID) + } + return true + }) + return &replicaAssignmentInfo{ + replicaID: replica.GetID(), + expectedNodeCount: 0, + rwNodes: rwNodes, + newRONodes: newRONodes, + recoverableRONodes: recoverableRONodes, + unrecoverableRONodes: unrecoverableRONodes, + } +} + type replicaAssignmentInfo struct { replicaID typeutil.UniqueID expectedNodeCount int // expected node count for each replica. @@ -236,6 +270,11 @@ func (s *replicaAssignmentInfo) GetRecoverNodesAndIncomingNodeCount() (recoverNo return recoverNodes, incomingNodeCount } +// GetUnrecoverableNodes returns the unrecoverable ro nodes for these replica. +func (s *replicaAssignmentInfo) GetUnrecoverableNodes() []int64 { + return s.unrecoverableRONodes.Collect() +} + // RangeOverAllNodes iterate all nodes in replica. func (s *replicaAssignmentInfo) RangeOverAllNodes(f func(nodeID int64)) { ff := func(nodeID int64) bool { @@ -270,3 +309,32 @@ func (s replicaAssignmentInfoSortByAvailableAndRecoverable) Less(i, j int) bool // Otherwise unstable assignment may cause unnecessary node transfer. return left < right || (left == right && s.replicaAssignmentInfoSorter[i].replicaID < s.replicaAssignmentInfoSorter[j].replicaID) } + +// newReplicaSQNAssignmentHelper creates a new replicaSQNAssignmentHelper. +func newReplicaSQNAssignmentHelper( + replicas []*Replica, + nodes typeutil.UniqueSet, +) *replicasInSameRGAssignmentHelper { + // We use a fake resource group name to create a helper. + assignmentInfos := make([]*replicaAssignmentInfo, 0, len(replicas)) + for _, replica := range replicas { + assignmentInfos = append(assignmentInfos, newReplicaSQNAssignmentInfo(replica, nodes)) + } + h := &replicasInSameRGAssignmentHelper{ + rgName: "", + nodesInRG: nodes, + incomingNodes: nodes.Clone(), + replicas: assignmentInfos, + } + // generate incoming nodes for collection. + h.RangeOverReplicas(func(assignment *replicaAssignmentInfo) { + assignment.RangeOverAllNodes(func(nodeID int64) { + if nodes.Contain(nodeID) { + h.incomingNodes.Remove(nodeID) + } + }) + }) + // update expected node count for all replicas in same resource group. + h.updateExpectedNodeCountForReplicas(len(nodes)) + return h +} diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index ae0be66efc693..e14a9c872264e 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -23,11 +23,13 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // check replica, find read only nodes and remove it from replica if all segment/channel has been moved @@ -53,8 +55,9 @@ func (ob *ReplicaObserver) Start() { ctx, cancel := context.WithCancel(context.Background()) ob.cancel = cancel - ob.wg.Add(1) + ob.wg.Add(2) go ob.schedule(ctx) + go ob.scheduleStreamingQN(ctx) }) } @@ -85,12 +88,75 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) { } } +// scheduleStreamingQN is used to check streaming query node in replica +func (ob *ReplicaObserver) scheduleStreamingQN(ctx context.Context) { + defer ob.wg.Done() + log.Info("Start streaming query node check replica loop") + + listener := snmanager.StaticStreamingNodeManager.ListenNodeChanged() + for { + ob.waitNodeChangedOrTimeout(ctx, listener) + if ctx.Err() != nil { + log.Info("Stop streaming query node check replica observer") + return + } + + ids := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs() + ob.checkStreamingQueryNodesInReplica(ids) + } +} + func (ob *ReplicaObserver) waitNodeChangedOrTimeout(ctx context.Context, listener *syncutil.VersionedListener) { ctxWithTimeout, cancel := context.WithTimeout(ctx, params.Params.QueryCoordCfg.CheckNodeInReplicaInterval.GetAsDuration(time.Second)) defer cancel() listener.Wait(ctxWithTimeout) } +func (ob *ReplicaObserver) checkStreamingQueryNodesInReplica(sqNodeIDs typeutil.UniqueSet) { + ctx := context.Background() + log := log.Ctx(ctx).WithRateGroup("qcv2.replicaObserver", 1, 60) + collections := ob.meta.GetAll(context.Background()) + + for _, collectionID := range collections { + ob.meta.RecoverSQNodesInCollection(context.Background(), collectionID, sqNodeIDs) + } + + for _, collectionID := range collections { + replicas := ob.meta.ReplicaManager.GetByCollection(ctx, collectionID) + for _, replica := range replicas { + roSQNodes := replica.GetROSQNodes() + rwSQNodes := replica.GetRWSQNodes() + if len(roSQNodes) == 0 { + continue + } + removeNodes := make([]int64, 0, len(roSQNodes)) + for _, node := range roSQNodes { + channels := ob.distMgr.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node)) + segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node)) + if len(channels) == 0 && len(segments) == 0 { + removeNodes = append(removeNodes, node) + } + } + if len(removeNodes) == 0 { + continue + } + logger := log.With( + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replica.GetID()), + zap.Int64s("removedNodes", removeNodes), + zap.Int64s("roNodes", roSQNodes), + zap.Int64s("rwNodes", rwSQNodes), + ) + if err := ob.meta.ReplicaManager.RemoveSQNode(ctx, replica.GetID(), removeNodes...); err != nil { + logger.Warn("fail to remove streaming query node from replica", zap.Error(err)) + continue + } + logger.Info("all segment/channel has been removed from ro streaming query node, remove it from replica") + } + + } +} + func (ob *ReplicaObserver) checkNodesInReplica() { ctx := context.Background() log := log.Ctx(ctx).WithRateGroup("qcv2.replicaObserver", 1, 60) @@ -135,7 +201,7 @@ func (ob *ReplicaObserver) checkNodesInReplica() { logger.Warn("fail to remove node from replica", zap.Error(err)) continue } - logger.Info("all segment/channel has been removed from ro node, try to remove it from replica") + logger.Info("all segment/channel has been removed from ro node, remove it from replica") } } } diff --git a/internal/querycoordv2/ops_services.go b/internal/querycoordv2/ops_services.go index d55f734d85e16..6018c792ee86b 100644 --- a/internal/querycoordv2/ops_services.go +++ b/internal/querycoordv2/ops_services.go @@ -227,12 +227,18 @@ func (s *Server) ResumeNode(ctx context.Context, req *querypb.ResumeNodeRequest) return merr.Status(errors.Wrap(err, errMsg)), nil } - if s.nodeMgr.Get(req.GetNodeID()) == nil { + info := s.nodeMgr.Get(req.GetNodeID()) + if info == nil { err := merr.WrapErrNodeNotFound(req.GetNodeID(), errMsg) log.Warn(errMsg, zap.Error(err)) return merr.Status(err), nil } + if info.IsEmbeddedQueryNodeInStreamingNode() { + return merr.Status( + merr.WrapErrParameterInvalidMsg("embedded query node in streaming node can't be resumed")), nil + } + s.meta.ResourceManager.HandleNodeUp(ctx, req.GetNodeID()) return merr.Success(), nil @@ -339,7 +345,7 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann // when no dst node specified, default to use all other nodes in same dstNodeSet := typeutil.NewUniqueSet() if req.GetToAllNodes() { - dstNodeSet.Insert(replica.GetRWNodes()...) + dstNodeSet.Insert(replica.GetRWSQNodes()...) } else { // check whether dstNode is healthy if err := s.isStoppingNode(ctx, req.GetTargetNodeID()); err != nil { diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8ae802cf7848f..2002477afc468 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -855,8 +855,16 @@ func (s *Server) tryHandleNodeUp() { } func (s *Server) handleNodeUp(node int64) { + nodeInfo := s.nodeMgr.Get(node) + if nodeInfo == nil { + return + } s.taskScheduler.AddExecutor(node) s.distController.StartDistInstance(s.ctx, node) + if nodeInfo.IsEmbeddedQueryNodeInStreamingNode() { + // The querynode embedded in the streaming node can not work with streaming node. + return + } // need assign to new rg and replica s.meta.ResourceManager.HandleNodeUp(s.ctx, node) } diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index fafd16d6b3b43..8704087bf5a01 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -24,6 +24,7 @@ import ( "github.com/blang/semver/v4" "go.uber.org/atomic" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/metrics" ) @@ -152,6 +153,10 @@ func (n *NodeInfo) Labels() map[string]string { return n.immutableInfo.Labels } +func (n *NodeInfo) IsEmbeddedQueryNodeInStreamingNode() bool { + return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbededQueryNode] == "1" +} + func (n *NodeInfo) SegmentCnt() int { n.mu.RLock() defer n.mu.RUnlock() diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index 1744e2367849f..e16a0040084fd 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -161,6 +162,7 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re } // Active recover it. RecoverReplicaOfCollection(ctx, m, collection) + m.RecoverSQNodesInCollection(ctx, collection, snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()) return replicas, nil } diff --git a/internal/streamingcoord/server/server.go b/internal/streamingcoord/server/server.go index 9e3db0ae4172f..da616346a7864 100644 --- a/internal/streamingcoord/server/server.go +++ b/internal/streamingcoord/server/server.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" _ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy "github.com/milvus-io/milvus/internal/streamingcoord/server/service" @@ -56,6 +57,7 @@ func (s *Server) initBasicComponent(ctx context.Context) error { return err } s.balancer.Set(balancer) + snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer) return err } diff --git a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go index 51965267f56fb..994a856daf214 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go +++ b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go @@ -141,7 +141,7 @@ func (c *channelLifetime) Run() error { func() { go func() { c.Cancel() }() }, ) if err != nil { - handler.Close() + scanner.Close() return err } ds.Start() diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 3ed57a9245907..53e03f792fcb3 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -48,8 +48,9 @@ const ( // DefaultServiceRoot default root path used in kv by Session DefaultServiceRoot = "session/" // DefaultIDKey default id key for Session - DefaultIDKey = "id" - SupportedLabelPrefix = "MILVUS_SERVER_LABEL_" + DefaultIDKey = "id" + SupportedLabelPrefix = "MILVUS_SERVER_LABEL_" + LabelStreamingNodeEmbededQueryNode = "QUERYNODE_STREAMING-EMBEDED" ) // SessionEventType session event type diff --git a/internal/util/streamingutil/env.go b/internal/util/streamingutil/env.go index 8c81c685fc1bf..ecb2e11d7a80e 100644 --- a/internal/util/streamingutil/env.go +++ b/internal/util/streamingutil/env.go @@ -1,6 +1,10 @@ package streamingutil -import "os" +import ( + "os" + + "github.com/milvus-io/milvus/internal/util/sessionutil" +) const MilvusStreamingServiceEnabled = "MILVUS_STREAMING_SERVICE_ENABLED" @@ -16,3 +20,14 @@ func MustEnableStreamingService() { panic("start a streaming node without enabling streaming service, please set environment variable MILVUS_STREAMING_SERVICE_ENABLED = 1") } } + +// EnableEmbededQueryNode set server labels for embedded query node. +func EnableEmbededQueryNode() { + MustEnableStreamingService() + os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbededQueryNode, "1") +} + +// IsEmbeddedQueryNode returns whether the current node is an embedded query node in streaming node. +func IsEmbeddedQueryNode() bool { + return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbededQueryNode) == "1" +}