Skip to content

Commit

Permalink
Add meta cache to datanode for L0 Delta (#27768)
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored Oct 23, 2023
1 parent 4a6790b commit 9091a27
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 0 deletions.
133 changes: 133 additions & 0 deletions internal/datanode/metacache/meta_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metacache

import (
"sync"

"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
)

type MetaCache interface {
NewSegment(segmentID, partitionID int64)
UpdateSegment(newSegmentID, partitionID int64, dropSegmentIDs ...int64)
GetSegmentIDsBy(filters ...SegmentFilter) []int64
}

type SegmentFilter func(info *SegmentInfo) bool

type SegmentInfo struct {
segmentID int64
partitionID int64
}

func newSegmentInfo(segmentID, partitionID int64) *SegmentInfo {
return &SegmentInfo{
segmentID: segmentID,
partitionID: partitionID,
}
}

func WithPartitionID(partitionID int64) func(info *SegmentInfo) bool {
return func(info *SegmentInfo) bool {
return info.partitionID == partitionID
}
}

var _ MetaCache = (*MetaCacheImpl)(nil)

type MetaCacheImpl struct {
collectionID int64
vChannelName string
segmentInfos map[int64]*SegmentInfo
mu sync.Mutex
}

func NewMetaCache(vchannel *datapb.VchannelInfo) MetaCache {
cache := &MetaCacheImpl{
collectionID: vchannel.GetCollectionID(),
vChannelName: vchannel.GetChannelName(),
segmentInfos: make(map[int64]*SegmentInfo),
}

cache.init(vchannel)
return cache
}

func (c *MetaCacheImpl) init(vchannel *datapb.VchannelInfo) {
for _, seg := range vchannel.FlushedSegments {
c.segmentInfos[seg.GetID()] = newSegmentInfo(seg.GetID(), seg.GetPartitionID())
}

for _, seg := range vchannel.UnflushedSegments {
c.segmentInfos[seg.GetID()] = newSegmentInfo(seg.GetID(), seg.GetPartitionID())
}
}

func (c *MetaCacheImpl) NewSegment(segmentID, partitionID int64) {
c.mu.Lock()
defer c.mu.Unlock()

if _, ok := c.segmentInfos[segmentID]; !ok {
c.segmentInfos[segmentID] = newSegmentInfo(segmentID, partitionID)
}
}

func (c *MetaCacheImpl) UpdateSegment(newSegmentID, partitionID int64, dropSegmentIDs ...int64) {
c.mu.Lock()
defer c.mu.Unlock()

for _, dropSeg := range dropSegmentIDs {
if _, ok := c.segmentInfos[dropSeg]; ok {
delete(c.segmentInfos, dropSeg)
} else {
log.Warn("some dropped segment not exist in meta cache",
zap.String("channel", c.vChannelName),
zap.Int64("collectionID", c.collectionID),
zap.Int64("segmentID", dropSeg))
}
}

if _, ok := c.segmentInfos[newSegmentID]; !ok {
c.segmentInfos[newSegmentID] = newSegmentInfo(newSegmentID, partitionID)
}
}

func (c *MetaCacheImpl) GetSegmentIDsBy(filters ...SegmentFilter) []int64 {
c.mu.Lock()
defer c.mu.Unlock()

filter := func(info *SegmentInfo) bool {
for _, filter := range filters {
if !filter(info) {
return false
}
}
return true
}

segments := []int64{}
for _, info := range c.segmentInfos {
if filter(info) {
segments = append(segments, info.segmentID)
}
}
return segments
}
106 changes: 106 additions & 0 deletions internal/datanode/metacache/meta_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metacache

import (
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/proto/datapb"
)

type MetaCacheSuite struct {
suite.Suite

collectionID int64
vchannel string
invaliedSeg int64
partitionIDs []int64
flushedSegments []int64
growingSegments []int64
newSegments []int64
cache MetaCache
}

func (s *MetaCacheSuite) SetupSuite() {
s.collectionID = 1
s.vchannel = "test"
s.partitionIDs = []int64{1, 2, 3, 4}
s.flushedSegments = []int64{1, 2, 3, 4}
s.growingSegments = []int64{5, 6, 7, 8}
s.newSegments = []int64{9, 10, 11, 12}
s.invaliedSeg = 111
}

func (s *MetaCacheSuite) SetupTest() {
flushSegmentInfos := lo.RepeatBy(len(s.flushedSegments), func(i int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: s.flushedSegments[i],
PartitionID: s.partitionIDs[i],
}
})

growingSegmentInfos := lo.RepeatBy(len(s.growingSegments), func(i int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: s.growingSegments[i],
PartitionID: s.partitionIDs[i],
}
})

s.cache = NewMetaCache(&datapb.VchannelInfo{
CollectionID: s.collectionID,
ChannelName: s.vchannel,
FlushedSegments: flushSegmentInfos,
UnflushedSegments: growingSegmentInfos,
})
}

func (s *MetaCacheSuite) TestNewSegment() {
for i, seg := range s.newSegments {
s.cache.NewSegment(seg, s.partitionIDs[i])
}

for id, partitionID := range s.partitionIDs {
segs := s.cache.GetSegmentIDsBy(WithPartitionID(partitionID))
targets := []int64{s.flushedSegments[id], s.growingSegments[id], s.newSegments[id]}
s.Equal(len(targets), len(segs))
for _, seg := range segs {
s.True(lo.Contains(targets, seg))
}
}
}

func (s *MetaCacheSuite) TestUpdateSegment() {
for i, seg := range s.newSegments {
// compaction from flushed[i], unflushed[i] and invalidSeg to new[i]
s.cache.UpdateSegment(seg, s.partitionIDs[i], s.flushedSegments[i], s.growingSegments[i], s.invaliedSeg)
}

for i, partitionID := range s.partitionIDs {
segs := s.cache.GetSegmentIDsBy(WithPartitionID(partitionID))
s.Equal(1, len(segs))
for _, seg := range segs {
s.Equal(seg, s.newSegments[i])
}
}
}

func TestMetaCacheSuite(t *testing.T) {
suite.Run(t, new(MetaCacheSuite))
}

0 comments on commit 9091a27

Please sign in to comment.