Skip to content

Commit

Permalink
enhance: add rw/ro streaming query node replica management
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 24, 2024
1 parent 877cd80 commit 0c32ff6
Show file tree
Hide file tree
Showing 22 changed files with 647 additions and 97 deletions.
6 changes: 6 additions & 0 deletions cmd/milvus/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableIndexNode = true
case typeutil.StreamingNodeRole:
streamingutil.MustEnableStreamingService()
streamingutil.EnableEmbededQueryNode()
role.EnableStreamingNode = true
role.EnableQueryNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.EnableRootCoord = true
role.EnableProxy = true
Expand All @@ -175,6 +177,10 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableIndexNode = enableIndexNode
role.EnableProxy = enableProxy
role.EnableStreamingNode = enableStreamingNode
if enableStreamingNode && !enableQueryNode {
role.EnableQueryNode = true
streamingutil.EnableEmbededQueryNode()
}
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp())
os.Exit(-1)
Expand Down
106 changes: 106 additions & 0 deletions internal/coordinator/snmanager/streaming_node_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package snmanager

import (
"context"
"sync"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var StaticStreamingNodeManager = newStreamingNodeManager()

func newStreamingNodeManager() *StreamingNodeManager {
snm := &StreamingNodeManager{
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
balancer: syncutil.NewFuture[balancer.Balancer](),
cond: syncutil.NewContextCond(&sync.Mutex{}),
latestAssignments: make(map[string]types.PChannelInfoAssigned),
streamingNodes: typeutil.NewUniqueSet(),
nodeChangedNotifier: syncutil.NewVersionedNotifier(),
}
go snm.execute()
return snm
}

// StreamingNodeManager is a manager for manage the querynode that embeded into streaming node.
// StreamingNodeManager is exclusive with ResourceManager.
type StreamingNodeManager struct {
notifier *syncutil.AsyncTaskNotifier[struct{}]
balancer *syncutil.Future[balancer.Balancer]
// The coord is merged after 2.6, so we don't need to make distribution safe.
cond *syncutil.ContextCond
latestAssignments map[string]types.PChannelInfoAssigned // The latest assignments info got from streaming coord balance module.
streamingNodes typeutil.UniqueSet
nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node in streaming node manager has been changed.
}

// GetWALLocated returns the server id of the node that the wal of the vChannel is located.
func (s *StreamingNodeManager) GetWALLocated(vChannel string) int64 {
pchannel := funcutil.ToPhysicalChannel(vChannel)
var targetServerID int64

s.cond.L.Lock()
for {
if assignment, ok := s.latestAssignments[pchannel]; ok {
targetServerID = assignment.Node.ServerID
break
}
s.cond.Wait(context.Background())
}
s.cond.L.Unlock()
return targetServerID
}

// GetStreamingQueryNodeIDs returns the server ids of the streaming query nodes.
func (s *StreamingNodeManager) GetStreamingQueryNodeIDs() typeutil.UniqueSet {
s.cond.L.Lock()
defer s.cond.L.Unlock()
return s.streamingNodes.Clone()
}

// ListenNodeChanged returns a listener for node changed event.
func (s *StreamingNodeManager) ListenNodeChanged() *syncutil.VersionedListener {
return s.nodeChangedNotifier.Listen(syncutil.VersionedListenAtEarliest)
}

// SetBalancerReady set the balancer ready for the streaming node manager from streamingcoord initialization.
func (s *StreamingNodeManager) SetBalancerReady(b balancer.Balancer) {
s.balancer.Set(b)
}

func (s *StreamingNodeManager) execute() (err error) {
defer s.notifier.Finish(struct{}{})

balancer, err := s.balancer.GetWithContext(s.notifier.Context())
if err != nil {
return errors.Wrap(err, "failed to wait balancer ready")
}
for {
if err := balancer.WatchChannelAssignments(s.notifier.Context(), func(
version typeutil.VersionInt64Pair,
relations []types.PChannelInfoAssigned,
) error {
s.cond.LockAndBroadcast()
s.latestAssignments = make(map[string]types.PChannelInfoAssigned)
s.streamingNodes = typeutil.NewUniqueSet()
for _, relation := range relations {
s.latestAssignments[relation.Channel.Name] = relation
s.streamingNodes.Insert(relation.Node.ServerID)
}
s.nodeChangedNotifier.NotifyAll()
log.Info("streaming node manager updated", zap.Any("assignments", s.latestAssignments), zap.Any("streamingNodes", s.streamingNodes))
s.cond.L.Unlock()
return nil
}); err != nil {
return err
}
}
}
11 changes: 9 additions & 2 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,18 @@ message ChannelNodeInfo {
message Replica {
int64 ID = 1;
int64 collectionID = 2;
// nodes and ro_nodes can only load sealed segment.
// only manage the legacy querynode that not embedded in the streamingnode.
repeated int64 nodes = 3; // all (read and write) nodes. mutual exclusive with ro_nodes.
string resource_group = 4;
repeated int64 ro_nodes = 5; // the in-using node but should not be assigned to these replica.
// can not load new channel or segment on it anymore.
map<string, ChannelNodeInfo> channel_node_infos = 6;
// cannot load segment on it anymore.
map<string, ChannelNodeInfo> channel_node_infos = 6;
// rw_sq_nodes and ro_sq_nodes can only watch channel and assign segment, will be removed in 3.0.
// only manage the querynode embedded in the streamingnode.
repeated int64 rw_sq_nodes = 7; // all (read and write) nodes. mutual exclusive with ro_sq_nodes.
repeated int64 ro_sq_nodes = 8; // the in-using node but should not be assigned to these replica.
// cannot watch channel on it anymore.
}

enum SyncType {
Expand Down
11 changes: 7 additions & 4 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,25 @@ func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int
if len(nodesInfo) == 0 {
return nil
}

channels, plans, scoreDelta := assignChannelToWALLocatedFirstForNodeInfo(channels, nodesInfo)

sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1)
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1)-scoreDelta[id1], b.scheduler.GetChannelTaskDelta(id2, -1)-scoreDelta[id2]
return cnt1+delta1 < cnt2+delta2
})
ret := make([]ChannelAssignPlan, 0, len(channels))

for i, c := range channels {
plan := ChannelAssignPlan{
Channel: c,
From: -1,
To: nodesInfo[i%len(nodesInfo)].ID(),
}
ret = append(ret, plan)
plans = append(plans, plan)
}
return ret
return plans
}

func (b *RoundRobinBalancer) BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
return b.ScoreBasedBalancer.BalanceReplica(ctx, replica)
}

// TODO: assign by channel
channelPlans = make([]ChannelAssignPlan, 0)
segmentPlans = make([]SegmentAssignPlan, 0)
for channelName := range channels {
Expand Down
60 changes: 35 additions & 25 deletions internal/querycoordv2/balance/multi_target_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,48 +485,58 @@ func (b *MultiTargetBalancer) BalanceReplica(ctx context.Context, replica *meta.
}
}()

if replica.NodesCount() == 0 {
return nil, nil
stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()

channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance)
if len(channelPlans) != 0 {
segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance)
}
return
}

func (b *MultiTargetBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan {
rwNodes := replica.GetRWSQNodes()
roNodes := replica.GetROSQNodes()
if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
return nil
}

if len(roNodes) != 0 {
if !stoppingBalance {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
return nil
}
return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)
}

if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
return b.genChannelPlan(ctx, br, replica, rwNodes)
}
return nil
}

func (b *MultiTargetBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan {
rwNodes := replica.GetRWNodes()
roNodes := replica.GetRONodes()

if len(rwNodes) == 0 {
// no available nodes to balance
return nil, nil
if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) {
return nil
}

// print current distribution before generating plans
segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
if len(roNodes) != 0 {
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
if !stoppingBalance {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
return nil, nil
return nil
}

log.Info("Handle stopping nodes",
zap.Any("stopping nodes", roNodes),
zap.Any("available nodes", rwNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
if b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...)
}
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = b.genSegmentPlan(ctx, replica, rwNodes)
}
return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)
}

return segmentPlans, channelPlans
return b.genSegmentPlan(ctx, replica, rwNodes)
}

func (b *MultiTargetBalancer) genSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64) []SegmentAssignPlan {
Expand Down
68 changes: 41 additions & 27 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID
if len(nodeItems) == 0 {
return nil
}

channels, plans := assignChannelToWALLocatedFirst(channels, nodeItems)

queue := newPriorityQueue()
for _, item := range nodeItems {
queue.push(item)
}

plans := make([]ChannelAssignPlan, 0, len(channels))
for _, c := range channels {
// pick the node with the least channel num and allocate to it.
ni := queue.pop().(*nodeItem)
Expand Down Expand Up @@ -181,47 +183,59 @@ func (b *RowCountBasedBalancer) BalanceReplica(ctx context.Context, replica *met
log.Info("balance plan generated", zap.Stringers("report details", br.records))
}
}()
if replica.NodesCount() == 0 {
return nil, nil

stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()

channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance)
if len(channelPlans) != 0 {
segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance)
}
return
}

func (b *RowCountBasedBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan {
rwNodes := replica.GetRWSQNodes()
roNodes := replica.GetROSQNodes()
if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
return nil
}

if len(roNodes) != 0 {
if !stoppingBalance {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
return nil
}
return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)
}

if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
return b.genChannelPlan(ctx, br, replica, rwNodes)
}
return nil
}

func (b *RowCountBasedBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan {
rwNodes := replica.GetRWNodes()
roNodes := replica.GetRONodes()
if len(rwNodes) == 0 {
// no available nodes to balance
return nil, nil
}

segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) {
return nil
}
// print current distribution before generating plans
if len(roNodes) != 0 {
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
if !stoppingBalance {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
return nil, nil
return nil
}

log.Info("Handle stopping nodes",
zap.Any("stopping nodes", roNodes),
zap.Any("available nodes", rwNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
if b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, replica, rwNodes)...)
}
return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)
}

return segmentPlans, channelPlans
return b.genSegmentPlan(ctx, replica, rwNodes)
}

func (b *RowCountBasedBalancer) genStoppingSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64, roNodes []int64) []SegmentAssignPlan {
Expand Down
Loading

0 comments on commit 0c32ff6

Please sign in to comment.