Skip to content

Commit

Permalink
fix: add unittest
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 28, 2024
1 parent ba72625 commit 70a9f34
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 4 deletions.
62 changes: 62 additions & 0 deletions internal/coordinator/snmanager/streaming_node_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package snmanager

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type pChannelInfoAssigned struct {
version typeutil.VersionInt64Pair
pchannels []types.PChannelInfoAssigned
}

func TestStreamingNodeManager(t *testing.T) {
m := newStreamingNodeManager()
b := mock_balancer.NewMockBalancer(t)

ch := make(chan pChannelInfoAssigned, 1)
b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).Run(
func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) {
for {
select {
case <-ctx.Done():
return
case p := <-ch:
cb(p.version, p.pchannels)
}
}
})
m.SetBalancerReady(b)

streamingNodes := m.GetStreamingQueryNodeIDs()
assert.Empty(t, streamingNodes)

ch <- pChannelInfoAssigned{
version: typeutil.VersionInt64Pair{
Global: 1,
Local: 1,
},
pchannels: []types.PChannelInfoAssigned{
{
Channel: types.PChannelInfo{Name: "a_test", Term: 1},
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
},
},
}

listener := m.ListenNodeChanged()
err := listener.Wait(context.Background())
assert.NoError(t, err)

node := m.GetWALLocated("a_test")
assert.Equal(t, node, int64(1))
streamingNodes = m.GetStreamingQueryNodeIDs()
assert.Equal(t, len(streamingNodes), 1)
}
5 changes: 4 additions & 1 deletion internal/querycoordv2/observers/replica_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -57,7 +58,9 @@ func (ob *ReplicaObserver) Start() {

ob.wg.Add(2)
go ob.schedule(ctx)
go ob.scheduleStreamingQN(ctx)
if streamingutil.IsStreamingServiceEnabled() {
go ob.scheduleStreamingQN(ctx)
}
})
}

Expand Down
13 changes: 11 additions & 2 deletions internal/querycoordv2/ops_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -273,7 +274,11 @@ func (s *Server) TransferSegment(ctx context.Context, req *querypb.TransferSegme
// when no dst node specified, default to use all other nodes in same
dstNodeSet := typeutil.NewUniqueSet()
if req.GetToAllNodes() {
dstNodeSet.Insert(replica.GetRWNodes()...)
if streamingutil.IsStreamingServiceEnabled() {
dstNodeSet.Insert(replica.GetRWSQNodes()...)
} else {
dstNodeSet.Insert(replica.GetRWNodes()...)
}
} else {
// check whether dstNode is healthy
if err := s.isStoppingNode(ctx, req.GetTargetNodeID()); err != nil {
Expand Down Expand Up @@ -345,7 +350,11 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann
// when no dst node specified, default to use all other nodes in same
dstNodeSet := typeutil.NewUniqueSet()
if req.GetToAllNodes() {
dstNodeSet.Insert(replica.GetRWSQNodes()...)
if streamingutil.IsStreamingServiceEnabled() {
dstNodeSet.Insert(replica.GetRWSQNodes()...)
} else {
dstNodeSet.Insert(replica.GetRWNodes()...)
}
} else {
// check whether dstNode is healthy
if err := s.isStoppingNode(ctx, req.GetTargetNodeID()); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion internal/querycoordv2/utils/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -162,7 +163,9 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re
}
// Active recover it.
RecoverReplicaOfCollection(ctx, m, collection)
m.RecoverSQNodesInCollection(ctx, collection, snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs())
if streamingutil.IsStreamingServiceEnabled() {
m.RecoverSQNodesInCollection(ctx, collection, snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs())
}
return replicas, nil
}

Expand Down

0 comments on commit 70a9f34

Please sign in to comment.