Skip to content

Commit

Permalink
Reduce write lock scope in channel manager (#27823)
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Oct 20, 2023
1 parent 086220d commit d2dbbbc
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 8 deletions.
32 changes: 24 additions & 8 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,17 +778,25 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {

// Reassign reassigns a channel to another DataNode.
func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()

c.mu.RLock()
ch := c.getChannelByNodeAndName(originNodeID, channelName)
if ch == nil {
c.mu.RUnlock()
return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", originNodeID, channelName)
}
c.mu.RUnlock()

reallocates := &NodeChannelInfo{originNodeID, []*channel{ch}}
isDropped := c.isMarkedDrop(channelName, ch.CollectionID)

c.mu.Lock()
defer c.mu.Unlock()
ch = c.getChannelByNodeAndName(originNodeID, channelName)
if ch == nil {
return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", originNodeID, channelName)
}

if c.isMarkedDrop(channelName, ch.CollectionID) {
if isDropped {
if err := c.remove(originNodeID, ch); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
Expand Down Expand Up @@ -817,13 +825,13 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err

// CleanupAndReassign tries to clean up datanode's subscription, and then reassigns the channel to another DataNode.
func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()

c.mu.RLock()
chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName)
if chToCleanUp == nil {
c.mu.RUnlock()
return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID)
}
c.mu.RUnlock()

if c.msgstreamFactory == nil {
log.Warn("msgstream factory is not set, unable to clean up topics")
Expand All @@ -834,8 +842,16 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
}

reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}
isDropped := c.isMarkedDrop(channelName, chToCleanUp.CollectionID)

c.mu.Lock()
defer c.mu.Unlock()
chToCleanUp = c.getChannelByNodeAndName(nodeID, channelName)
if chToCleanUp == nil {
return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID)
}

if c.isMarkedDrop(channelName, chToCleanUp.CollectionID) {
if isDropped {
if err := c.remove(nodeID, chToCleanUp); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}
Expand Down
108 changes: 108 additions & 0 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/milvus-io/milvus/internal/kv"
Expand Down Expand Up @@ -568,6 +569,113 @@ func TestChannelManager(t *testing.T) {
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
})

t.Run("test Reassign with get channel fail", func(t *testing.T) {
chManager, err := NewChannelManager(watchkv, newMockHandler())
require.NoError(t, err)

err = chManager.Reassign(1, "not-exists-channelName")
assert.Error(t, err)
})

t.Run("test Reassign with dropped channel", func(t *testing.T) {
collectionID := UniqueID(5)
handler := NewNMockHandler(t)
handler.EXPECT().
CheckShouldDropChannel(mock.Anything, mock.Anything).
Return(true)
handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil)
chManager, err := NewChannelManager(watchkv, handler)
require.NoError(t, err)

chManager.store.Add(1)
ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)

assert.Equal(t, 1, chManager.store.GetNodeChannelCount(1))
err = chManager.Reassign(1, "chan")
assert.NoError(t, err)
assert.Equal(t, 0, chManager.store.GetNodeChannelCount(1))
})

t.Run("test Reassign-channel not found", func(t *testing.T) {
var chManager *ChannelManager
var err error
handler := NewNMockHandler(t)
handler.EXPECT().
CheckShouldDropChannel(mock.Anything, mock.Anything).
Run(func(channel string, collectionID int64) {
channels, err := chManager.store.Delete(1)
assert.NoError(t, err)
assert.Equal(t, 1, len(channels))
}).Return(true).Once()

chManager, err = NewChannelManager(watchkv, handler)
require.NoError(t, err)

chManager.store.Add(1)
ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1})
err = chManager.store.Update(ops)
require.NoError(t, err)

assert.Equal(t, 1, chManager.store.GetNodeChannelCount(1))
err = chManager.Reassign(1, "chan")
assert.Error(t, err)
})

t.Run("test CleanupAndReassign-channel not found", func(t *testing.T) {
var chManager *ChannelManager
var err error
handler := NewNMockHandler(t)
handler.EXPECT().
CheckShouldDropChannel(mock.Anything, mock.Anything).
Run(func(channel string, collectionID int64) {
channels, err := chManager.store.Delete(1)
assert.NoError(t, err)
assert.Equal(t, 1, len(channels))
}).Return(true).Once()

chManager, err = NewChannelManager(watchkv, handler)
require.NoError(t, err)

chManager.store.Add(1)
ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1})
err = chManager.store.Update(ops)
require.NoError(t, err)

assert.Equal(t, 1, chManager.store.GetNodeChannelCount(1))
err = chManager.CleanupAndReassign(1, "chan")
assert.Error(t, err)
})

t.Run("test CleanupAndReassign with get channel fail", func(t *testing.T) {
chManager, err := NewChannelManager(watchkv, newMockHandler())
require.NoError(t, err)

err = chManager.CleanupAndReassign(1, "not-exists-channelName")
assert.Error(t, err)
})

t.Run("test CleanupAndReassign with dropped channel", func(t *testing.T) {
handler := NewNMockHandler(t)
handler.EXPECT().
CheckShouldDropChannel(mock.Anything, mock.Anything).
Return(true)
handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil)
chManager, err := NewChannelManager(watchkv, handler)
require.NoError(t, err)

chManager.store.Add(1)
ops := getOpsWithWatchInfo(1, &channel{Name: "chan", CollectionID: 1})
err = chManager.store.Update(ops)
require.NoError(t, err)

assert.Equal(t, 1, chManager.store.GetNodeChannelCount(1))
err = chManager.CleanupAndReassign(1, "chan")
assert.NoError(t, err)
assert.Equal(t, 0, chManager.store.GetNodeChannelCount(1))
})

t.Run("test DeleteNode", func(t *testing.T) {
defer watchkv.RemoveWithPrefix("")

Expand Down

0 comments on commit d2dbbbc

Please sign in to comment.