Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix checkGeneralCapacity slowly #37976

Merged
merged 2 commits into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 4 additions & 21 deletions internal/rootcoord/constrant.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const (
Expand All @@ -35,7 +34,6 @@ func checkGeneralCapacity(ctx context.Context, newColNum int,
newParNum int64,
newShardNum int32,
core *Core,
ts typeutil.Timestamp,
) error {
var addedNum int64 = 0
if newColNum > 0 && newParNum > 0 && newShardNum > 0 {
Expand All @@ -46,25 +44,10 @@ func checkGeneralCapacity(ctx context.Context, newColNum int,
addedNum += newParNum
}

var generalNum int64 = 0
collectionsMap := core.meta.ListAllAvailCollections(ctx)
for dbId, collectionIDs := range collectionsMap {
db, err := core.meta.GetDatabaseByID(ctx, dbId, ts)
if err == nil {
for _, collectionId := range collectionIDs {
collection, err := core.meta.GetCollectionByID(ctx, db.Name, collectionId, ts, true)
if err == nil {
partNum := int64(collection.GetPartitionNum(false))
shardNum := int64(collection.ShardsNum)
generalNum += partNum * shardNum
}
}
}
}

generalNum += addedNum
if generalNum > Params.RootCoordCfg.MaxGeneralCapacity.GetAsInt64() {
return merr.WrapGeneralCapacityExceed(generalNum, Params.RootCoordCfg.MaxGeneralCapacity.GetAsInt64(),
generalCount := core.meta.GetGeneralCount(ctx)
generalCount += int(addedNum)
if generalCount > Params.RootCoordCfg.MaxGeneralCapacity.GetAsInt() {
return merr.WrapGeneralCapacityExceed(generalCount, Params.RootCoordCfg.MaxGeneralCapacity.GetAsInt64(),
"failed checking constraint: sum_collections(parition*shard) exceeding the max general capacity:")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/create_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (t *createCollectionTask) validate() error {
if t.Req.GetNumPartitions() > 0 {
newPartNum = t.Req.GetNumPartitions()
}
return checkGeneralCapacity(t.ctx, 1, newPartNum, t.Req.GetShardsNum(), t.core, t.ts)
return checkGeneralCapacity(t.ctx, 1, newPartNum, t.Req.GetShardsNum(), t.core)
}

// checkMaxCollectionsPerDB DB properties take precedence over quota configurations for max collections.
Expand Down
31 changes: 4 additions & 27 deletions internal/rootcoord/create_collection_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,23 +246,7 @@ func Test_createCollectionTask_validate(t *testing.T) {
meta.EXPECT().ListAllAvailCollections(mock.Anything).Return(map[int64][]int64{1: {1, 2}})
meta.EXPECT().GetDatabaseByName(mock.Anything, mock.Anything, mock.Anything).
Return(&model.Database{Name: "db1"}, nil).Once()

meta.On("GetDatabaseByID",
mock.Anything, mock.Anything, mock.Anything,
).Return(&model.Database{
Name: "default",
}, nil)
meta.On("GetCollectionByID",
mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything,
).Return(&model.Collection{
Name: "default",
ShardsNum: 2,
Partitions: []*model.Partition{
{
PartitionID: 1,
},
},
}, nil)
meta.EXPECT().GetGeneralCount(mock.Anything).Return(1)

core := newTestCore(withMeta(meta))

Expand Down Expand Up @@ -295,8 +279,7 @@ func Test_createCollectionTask_validate(t *testing.T) {
},
},
}, nil).Once()
meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
meta.EXPECT().GetGeneralCount(mock.Anything).Return(0)

core := newTestCore(withMeta(meta))
task := createCollectionTask{
Expand Down Expand Up @@ -734,6 +717,7 @@ func Test_createCollectionTask_Prepare(t *testing.T) {
).Return(map[int64][]int64{
util.DefaultDBID: {1, 2},
}, nil)
meta.EXPECT().GetGeneralCount(mock.Anything).Return(0)

paramtable.Get().Save(Params.QuotaConfig.MaxCollectionNum.Key, strconv.Itoa(math.MaxInt64))
defer paramtable.Get().Reset(Params.QuotaConfig.MaxCollectionNum.Key)
Expand All @@ -754,8 +738,6 @@ func Test_createCollectionTask_Prepare(t *testing.T) {
})

t.Run("invalid schema", func(t *testing.T) {
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))
core := newTestCore(withMeta(meta))
collectionName := funcutil.GenRandomStr()
task := &createCollectionTask{
Expand Down Expand Up @@ -784,8 +766,6 @@ func Test_createCollectionTask_Prepare(t *testing.T) {
}
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))
core := newTestCore(withInvalidIDAllocator(), withMeta(meta))

task := createCollectionTask{
Expand All @@ -808,8 +788,6 @@ func Test_createCollectionTask_Prepare(t *testing.T) {
field1 := funcutil.GenRandomStr()

ticker := newRocksMqTtSynchronizer()
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))

core := newTestCore(withValidIDAllocator(), withTtSynchronizer(ticker), withMeta(meta))

Expand Down Expand Up @@ -1160,8 +1138,7 @@ func Test_createCollectionTask_PartitionKey(t *testing.T) {
).Return(map[int64][]int64{
util.DefaultDBID: {1, 2},
}, nil)
meta.On("GetDatabaseByID", mock.Anything,
mock.Anything, mock.Anything).Return(nil, errors.New("mock"))
meta.EXPECT().GetGeneralCount(mock.Anything).Return(0)

paramtable.Get().Save(Params.QuotaConfig.MaxCollectionNum.Key, strconv.Itoa(math.MaxInt64))
defer paramtable.Get().Reset(Params.QuotaConfig.MaxCollectionNum.Key)
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/create_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (t *createPartitionTask) Prepare(ctx context.Context) error {
return err
}
t.collMeta = collMeta
return checkGeneralCapacity(ctx, 0, 1, 0, t.core, t.ts)
return checkGeneralCapacity(ctx, 0, 1, 0, t.core)
}

func (t *createPartitionTask) Execute(ctx context.Context) error {
Expand Down
10 changes: 1 addition & 9 deletions internal/rootcoord/create_partition_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"testing"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -62,14 +61,7 @@ func Test_createPartitionTask_Prepare(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(coll.Clone(), nil)
meta.On("ListAllAvailCollections",
mock.Anything,
).Return(map[int64][]int64{
1: {1, 2},
}, nil)
meta.On("GetDatabaseByID",
mock.Anything, mock.Anything, mock.Anything,
).Return(nil, errors.New("mock"))
meta.EXPECT().GetGeneralCount(mock.Anything).Return(0)

core := newTestCore(withMeta(meta))
task := &createPartitionTask{
Expand Down
19 changes: 19 additions & 0 deletions internal/rootcoord/meta_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
ListAliases(ctx context.Context, dbName string, collectionName string, ts Timestamp) ([]string, error)
AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp) error
RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error
GetGeneralCount(ctx context.Context) int

// TODO: it'll be a big cost if we handle the time travel logic, since we should always list all aliases in catalog.
IsAlias(ctx context.Context, db, name string) bool
Expand Down Expand Up @@ -115,6 +116,8 @@
dbName2Meta map[string]*model.Database // database name -> db meta
collID2Meta map[typeutil.UniqueID]*model.Collection // collection id -> collection meta

generalCnt int // sum of product of partition number and shard number

// collections *collectionDb
names *nameDb
aliases *nameDb
Expand Down Expand Up @@ -189,6 +192,7 @@
}
for _, collection := range collections {
mt.collID2Meta[collection.CollectionID] = collection
mt.generalCnt += len(collection.Partitions) * int(collection.ShardsNum)
if collection.Available() {
mt.names.insert(dbName, collection.Name, collection.CollectionID)
collectionNum++
Expand Down Expand Up @@ -417,6 +421,8 @@
mt.collID2Meta[coll.CollectionID] = coll.Clone()
mt.names.insert(db.Name, coll.Name, coll.CollectionID)

mt.generalCnt += len(coll.Partitions) * int(coll.ShardsNum)

log.Ctx(ctx).Info("add collection to meta table",
zap.Int64("dbID", coll.DBID),
zap.String("collection", coll.Name),
Expand Down Expand Up @@ -521,6 +527,8 @@
mt.removeAllNamesIfMatchedInternal(collectionID, allNames)
mt.removeCollectionByIDInternal(collectionID)

mt.generalCnt -= len(coll.Partitions) * int(coll.ShardsNum)

log.Ctx(ctx).Info("remove collection",
zap.Int64("dbID", coll.DBID),
zap.String("name", coll.Name),
Expand Down Expand Up @@ -895,6 +903,8 @@
}
mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone())

mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum

metrics.RootCoordNumOfPartitions.WithLabelValues().Inc()

log.Ctx(ctx).Info("add partition to meta table",
Expand Down Expand Up @@ -961,6 +971,7 @@
}
if loc != -1 {
coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...)
mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum

Check warning on line 974 in internal/rootcoord/meta_table.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/meta_table.go#L974

Added line #L974 was not covered by tests
}
log.Info("remove partition", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Uint64("ts", ts))
return nil
Expand Down Expand Up @@ -1229,6 +1240,14 @@
return mt.listAliasesByID(collID)
}

// GetGeneralCount gets the general count(sum of product of partition number and shard number).
func (mt *MetaTable) GetGeneralCount(ctx context.Context) int {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()

return mt.generalCnt
}

// AddCredential add credential
func (mt *MetaTable) AddCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) error {
if credInfo.Username == "" {
Expand Down
46 changes: 46 additions & 0 deletions internal/rootcoord/mocks/meta_table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading