diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 1712be8c75da2..a70d5056b1755 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -778,17 +778,25 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error { // Reassign reassigns a channel to another DataNode. func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) error { - c.mu.Lock() - defer c.mu.Unlock() - + c.mu.RLock() ch := c.getChannelByNodeAndName(originNodeID, channelName) if ch == nil { + c.mu.RUnlock() return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", originNodeID, channelName) } + c.mu.RUnlock() reallocates := &NodeChannelInfo{originNodeID, []*channel{ch}} + isDropped := c.isMarkedDrop(channelName, ch.CollectionID) + + c.mu.Lock() + defer c.mu.Unlock() + ch = c.getChannelByNodeAndName(originNodeID, channelName) + if ch == nil { + return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", originNodeID, channelName) + } - if c.isMarkedDrop(channelName, ch.CollectionID) { + if isDropped { if err := c.remove(originNodeID, ch); err != nil { return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) } @@ -817,13 +825,13 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err // CleanupAndReassign tries to clean up datanode's subscription, and then reassigns the channel to another DataNode. func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error { - c.mu.Lock() - defer c.mu.Unlock() - + c.mu.RLock() chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName) if chToCleanUp == nil { + c.mu.RUnlock() return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID) } + c.mu.RUnlock() if c.msgstreamFactory == nil { log.Warn("msgstream factory is not set, unable to clean up topics") @@ -834,8 +842,16 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) } reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}} + isDropped := c.isMarkedDrop(channelName, chToCleanUp.CollectionID) + + c.mu.Lock() + defer c.mu.Unlock() + chToCleanUp = c.getChannelByNodeAndName(nodeID, channelName) + if chToCleanUp == nil { + return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID) + } - if c.isMarkedDrop(channelName, chToCleanUp.CollectionID) { + if isDropped { if err := c.remove(nodeID, chToCleanUp); err != nil { return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) } diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index b3b87d6776110..14b14d3a421dd 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -26,6 +26,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/milvus-io/milvus/internal/kv" @@ -568,6 +569,113 @@ func TestChannelManager(t *testing.T) { waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) }) + t.Run("test Reassign with get channel fail", func(t *testing.T) { + chManager, err := NewChannelManager(watchkv, newMockHandler()) + require.NoError(t, err) + + err = chManager.Reassign(1, "not-exists-channelName") + assert.Error(t, err) + }) + + t.Run("test Reassign with dropped channel", func(t *testing.T) { + collectionID := UniqueID(5) + handler := NewNMockHandler(t) + handler.EXPECT(). + CheckShouldDropChannel(mock.Anything, mock.Anything). + Return(true) + handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil) + chManager, err := NewChannelManager(watchkv, handler) + require.NoError(t, err) + + chManager.store.Add(1) + ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: collectionID}) + err = chManager.store.Update(ops) + require.NoError(t, err) + + assert.Equal(t, 1, chManager.store.GetNodeChannelCount(1)) + err = chManager.Reassign(1, "chan") + assert.NoError(t, err) + assert.Equal(t, 0, chManager.store.GetNodeChannelCount(1)) + }) + + t.Run("test Reassign-channel not found", func(t *testing.T) { + var chManager *ChannelManager + var err error + handler := NewNMockHandler(t) + handler.EXPECT(). + CheckShouldDropChannel(mock.Anything, mock.Anything). + Run(func(channel string, collectionID int64) { + channels, err := chManager.store.Delete(1) + assert.NoError(t, err) + assert.Equal(t, 1, len(channels)) + }).Return(true).Once() + + chManager, err = NewChannelManager(watchkv, handler) + require.NoError(t, err) + + chManager.store.Add(1) + ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1}) + err = chManager.store.Update(ops) + require.NoError(t, err) + + assert.Equal(t, 1, chManager.store.GetNodeChannelCount(1)) + err = chManager.Reassign(1, "chan") + assert.Error(t, err) + }) + + t.Run("test CleanupAndReassign-channel not found", func(t *testing.T) { + var chManager *ChannelManager + var err error + handler := NewNMockHandler(t) + handler.EXPECT(). + CheckShouldDropChannel(mock.Anything, mock.Anything). + Run(func(channel string, collectionID int64) { + channels, err := chManager.store.Delete(1) + assert.NoError(t, err) + assert.Equal(t, 1, len(channels)) + }).Return(true).Once() + + chManager, err = NewChannelManager(watchkv, handler) + require.NoError(t, err) + + chManager.store.Add(1) + ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1}) + err = chManager.store.Update(ops) + require.NoError(t, err) + + assert.Equal(t, 1, chManager.store.GetNodeChannelCount(1)) + err = chManager.CleanupAndReassign(1, "chan") + assert.Error(t, err) + }) + + t.Run("test CleanupAndReassign with get channel fail", func(t *testing.T) { + chManager, err := NewChannelManager(watchkv, newMockHandler()) + require.NoError(t, err) + + err = chManager.CleanupAndReassign(1, "not-exists-channelName") + assert.Error(t, err) + }) + + t.Run("test CleanupAndReassign with dropped channel", func(t *testing.T) { + handler := NewNMockHandler(t) + handler.EXPECT(). + CheckShouldDropChannel(mock.Anything, mock.Anything). + Return(true) + handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil) + chManager, err := NewChannelManager(watchkv, handler) + require.NoError(t, err) + + chManager.store.Add(1) + ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1}) + err = chManager.store.Update(ops) + require.NoError(t, err) + + assert.Equal(t, 1, chManager.store.GetNodeChannelCount(1)) + err = chManager.CleanupAndReassign(1, "chan") + assert.NoError(t, err) + assert.Equal(t, 0, chManager.store.GetNodeChannelCount(1)) + }) + t.Run("test DeleteNode", func(t *testing.T) { defer watchkv.RemoveWithPrefix("")