diff --git a/internal/datacoord/dataview/data_view.go b/internal/datacoord/dataview/data_view.go new file mode 100644 index 0000000000000..c34b59b820373 --- /dev/null +++ b/internal/datacoord/dataview/data_view.go @@ -0,0 +1,28 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataview + +import "github.com/milvus-io/milvus/internal/proto/datapb" + +const InitialDataViewVersion = 0 + +type DataView struct { + CollectionID int64 + Channels map[string]*datapb.VchannelInfo + Segments map[int64]struct{} + Version int64 +} diff --git a/internal/datacoord/dataview/update_chan.go b/internal/datacoord/dataview/update_chan.go new file mode 100644 index 0000000000000..ba139d68bcd45 --- /dev/null +++ b/internal/datacoord/dataview/update_chan.go @@ -0,0 +1,33 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataview + +import "sync" + +var updateChan chan int64 +var initOnce sync.Once + +func initUpdateChan() { + initOnce.Do(func() { + updateChan = make(chan int64, 1024) + }) +} + +// NotifyUpdate used to trigger updating data view immediately. +func NotifyUpdate(collectionID int64) { + updateChan <- collectionID +} diff --git a/internal/datacoord/dataview/view_manager.go b/internal/datacoord/dataview/view_manager.go new file mode 100644 index 0000000000000..383ac1c3ec239 --- /dev/null +++ b/internal/datacoord/dataview/view_manager.go @@ -0,0 +1,156 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataview + +import ( + "sync" + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type PullNewDataViewFunction func(collectionID int64) (*DataView, error) + +type ViewManager interface { + Get(collectionID int64) (*DataView, error) + GetVersion(collectionID int64) int64 + + Start() + Close() +} + +type dataViewManager struct { + pullFn PullNewDataViewFunction + currentViews *typeutil.ConcurrentMap[int64, *DataView] + + closeOnce sync.Once + closeChan chan struct{} +} + +func NewDataViewManager(pullFn PullNewDataViewFunction) ViewManager { + initUpdateChan() + return &dataViewManager{ + pullFn: pullFn, + currentViews: typeutil.NewConcurrentMap[int64, *DataView](), + } +} + +func (m *dataViewManager) Get(collectionID int64) (*DataView, error) { + if view, ok := m.currentViews.Get(collectionID); ok { + return view, nil + } + view, err := m.pullFn(collectionID) + if err != nil { + return nil, err + } + m.currentViews.GetOrInsert(collectionID, view) + return view, nil +} + +func (m *dataViewManager) GetVersion(collectionID int64) int64 { + if view, ok := m.currentViews.Get(collectionID); ok { + return view.Version + } + return InitialDataViewVersion +} + +func (m *dataViewManager) Start() { + ticker := time.NewTicker(paramtable.Get().DataCoordCfg.DataViewUpdateInterval.GetAsDuration(time.Second)) + defer ticker.Stop() + for { + select { + case <-m.closeChan: + log.Info("data view manager exited") + return + case <-ticker.C: + // periodically update all data view + for _, collectionID := range m.currentViews.Keys() { + m.TryUpdateDataView(collectionID) + } + case collectionID := <-updateChan: + m.TryUpdateDataView(collectionID) + } + } +} + +func (m *dataViewManager) Close() { + m.closeOnce.Do(func() { + close(m.closeChan) + }) +} + +func (m *dataViewManager) update(view *DataView) { + _, ok := m.currentViews.GetOrInsert(view.CollectionID, view) + if ok { + log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version)) + } +} + +func (m *dataViewManager) TryUpdateDataView(collectionID int64) { + newView, err := m.pullFn(collectionID) + if err != nil { + log.Warn("pull new data view failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + // notify to trigger pull again + NotifyUpdate(collectionID) + return + } + + currentView, ok := m.currentViews.Get(collectionID) + if !ok { + m.currentViews.GetOrInsert(collectionID, newView) + return + } + // no-op if the incoming version is less than the current version. + if newView.Version <= currentView.Version { + return + } + + // check if channel info has been updated. + for channel, new := range newView.Channels { + current, ok := currentView.Channels[channel] + if !ok { + m.update(newView) + return + } + if !funcutil.SliceSetEqual(new.GetLevelZeroSegmentIds(), current.GetLevelZeroSegmentIds()) || + !funcutil.SliceSetEqual(new.GetUnflushedSegmentIds(), current.GetUnflushedSegmentIds()) || + !funcutil.SliceSetEqual(new.GetFlushedSegmentIds(), current.GetFlushedSegmentIds()) || + !funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) || + !funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) { + m.update(newView) + return + } + if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) { + m.update(newView) + return + } + if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() { + m.update(newView) + return + } + } + + // check if segment info has been updated. + if !typeutil.MapEqual(newView.Segments, currentView.Segments) { + m.currentViews.GetOrInsert(collectionID, newView) + } +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index c735681951f3f..5ca899779e2f0 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -26,6 +26,8 @@ import ( "syscall" "time" + "github.com/milvus-io/milvus/internal/datacoord/dataview" + "github.com/blang/semver/v4" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -125,6 +127,7 @@ type Server struct { importMeta ImportMeta importScheduler ImportScheduler importChecker ImportChecker + viewManager dataview.ViewManager compactionTrigger trigger compactionHandler compactionPlanContext @@ -415,6 +418,8 @@ func (s *Server) initDataCoord() error { s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta) s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.importMeta, s.jobManager) + s.viewManager = dataview.NewDataViewManager(s.pullNewDataView) + s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager) s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) @@ -751,6 +756,7 @@ func (s *Server) startServerLoop() { s.startFlushLoop(s.serverLoopCtx) go s.importScheduler.Start() go s.importChecker.Start() + go s.viewManager.Start() s.garbageCollector.start() if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) { @@ -1091,6 +1097,7 @@ func (s *Server) Stop() error { s.importScheduler.Close() s.importChecker.Close() + s.viewManager.Close() s.syncSegmentsScheduler.Stop() s.stopCompaction() diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 3b263a9383fee..7f0d73e20096a 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "fmt" + "github.com/milvus-io/milvus/internal/datacoord/dataview" "math" "strconv" "time" @@ -844,13 +845,35 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf return resp, nil } +// GetDataViewVersions retrieves the data view versions of the target collections. +func (s *Server) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) { + log := log.Ctx(ctx).With(zap.Int("numCollections", len(req.GetCollectionIDs()))) + log.Info("GetDataViewVersions request received") + resp := &datapb.GetDataViewVersionsResponse{ + Status: merr.Success(), + } + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetDataViewVersionsResponse{ + Status: merr.Status(err), + }, nil + } + + versions := make(map[int64]int64, len(req.GetCollectionIDs())) + for _, id := range req.GetCollectionIDs() { + versions[id] = s.viewManager.GetVersion(id) + } + + resp.DataViewVersions = versions + log.Info("GetDataViewVersions done") + return resp, nil +} + // GetRecoveryInfoV2 get recovery info for segment // Called by: QueryCoord. func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryInfoRequestV2) (*datapb.GetRecoveryInfoResponseV2, error) { - log := log.Ctx(ctx) collectionID := req.GetCollectionID() partitionIDs := req.GetPartitionIDs() - log = log.With( + log := log.Ctx(ctx).With( zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), ) @@ -863,13 +886,21 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI Status: merr.Status(err), }, nil } - channels := s.channelManager.GetChannelsByCollectionID(collectionID) - channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) - flushedIDs := make(typeutil.UniqueSet) - for _, ch := range channels { - channelInfo := s.handler.GetQueryVChanPositions(ch, partitionIDs...) + + dataView, err := s.viewManager.Get(req.GetCollectionID()) + if err != nil { + log.Warn("get data view failed in GetRecoveryInfoV2", zap.Error(err)) + resp.Status = merr.Status(err) + return resp, nil + } + + channelInfos := make([]*datapb.VchannelInfo, 0, len(dataView.Channels)) + for _, info := range dataView.Channels { + channelInfo := typeutil.Clone(info) + // retrieve target partition stats versions + channelInfo.PartitionStatsVersions = lo.PickByKeys(channelInfo.PartitionStatsVersions, req.GetPartitionIDs()) channelInfos = append(channelInfos, channelInfo) - log.Info("datacoord append channelInfo in GetRecoveryInfo", + log.Info("datacoord append channelInfo in GetRecoveryInfoV2", zap.String("channel", channelInfo.GetChannelName()), zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())), zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())), @@ -877,31 +908,17 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())), zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())), ) - flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...) } segmentInfos := make([]*datapb.SegmentInfo, 0) - for id := range flushedIDs { + for id := range dataView.Segments { segment := s.meta.GetSegment(id) if segment == nil { - err := merr.WrapErrSegmentNotFound(id) + err = merr.WrapErrSegmentNotFound(id) log.Warn("failed to get segment", zap.Int64("segmentID", id)) resp.Status = merr.Status(err) return resp, nil } - // Skip non-flushing, non-flushed and dropped segments. - if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped { - continue - } - // Also skip bulk insert segments. - if segment.GetIsImporting() { - continue - } - - binlogs := segment.GetBinlogs() - if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 { - continue - } rowCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo) if rowCount != segment.NumOfRows && rowCount > 0 { log.Warn("segment row number meta inconsistent with bin log row count and will be corrected", @@ -911,7 +928,6 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI } else { rowCount = segment.NumOfRows } - segmentInfos = append(segmentInfos, &datapb.SegmentInfo{ ID: segment.ID, PartitionID: segment.PartitionID, @@ -928,6 +944,68 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI return resp, nil } +func (s *Server) pullNewDataView(collectionID int64) (*dataview.DataView, error) { + version := time.Now().UnixNano() + log := log.With( + zap.Int64("collectionID", collectionID), + zap.Int64("version", version), + ) + + channels := s.channelManager.GetChannelsByCollectionID(collectionID) + channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) + flushedIDs := make(typeutil.UniqueSet) + + for _, ch := range channels { + channelInfo := s.handler.GetQueryVChanPositions(ch, allPartitionID) + channelInfos = append(channelInfos, channelInfo) + log.Info("datacoord append channelInfo in pullNewDataView", + zap.String("channel", channelInfo.GetChannelName()), + zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())), + zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())), + zap.Int("# of dropped segments", len(channelInfo.GetDroppedSegmentIds())), + zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())), + zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())), + ) + flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...) + } + + segments := make([]int64, 0) + for id := range flushedIDs { + segment := s.meta.GetSegment(id) + if segment == nil { + err := merr.WrapErrSegmentNotFound(id) + log.Warn("failed to get segment", zap.Int64("segmentID", id)) + return nil, err + } + // Skip non-flushing, non-flushed and dropped segments. + if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped { + continue + } + // Also skip bulk insert segments. + if segment.GetIsImporting() { + continue + } + + binlogs := segment.GetBinlogs() + if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 { + continue + } + segments = append(segments, id) + } + + newDV := &dataview.DataView{ + CollectionID: collectionID, + Channels: lo.KeyBy(channelInfos, func(v *datapb.VchannelInfo) string { + return v.GetChannelName() + }), + Segments: lo.SliceToMap(segments, func(id int64) (int64, struct{}) { + return id, struct{}{} + }), + Version: version, + } + return newDV, nil +} + // GetChannelRecoveryInfo get recovery channel info. // Called by: StreamingNode. func (s *Server) GetChannelRecoveryInfo(ctx context.Context, req *datapb.GetChannelRecoveryInfoRequest) (*datapb.GetChannelRecoveryInfoResponse, error) { diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index bb095cdae30b0..546933adb0cd1 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -321,6 +321,18 @@ func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath }) } +// GetDataViewVersions retrieves the data view versions of the target collections. +func (c *Client) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) + return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetDataViewVersionsResponse, error) { + return client.GetDataViewVersions(ctx, req) + }) +} + // GetRecoveryInfo request segment recovery info of collection/partition // // ctx is the context to control request deadline and cancellation diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index ee17f8c0d3a03..c76a76e04b798 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -350,6 +350,11 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return s.dataCoord.SaveBinlogPaths(ctx, req) } +// GetDataViewVersions retrieves the data view versions of the target collections. +func (s *Server) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) { + return s.dataCoord.GetDataViewVersions(ctx, req) +} + // GetRecoveryInfo gets information for recovering channels func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { return s.dataCoord.GetRecoveryInfo(ctx, req) diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index b33f31f3c155f..363910eda1ecb 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -1041,6 +1041,65 @@ func (_c *MockDataCoord_GetComponentStates_Call) RunAndReturn(run func(context.C return _c } +// GetDataViewVersions provides a mock function with given fields: _a0, _a1 +func (_m *MockDataCoord) GetDataViewVersions(_a0 context.Context, _a1 *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for GetDataViewVersions") + } + + var r0 *datapb.GetDataViewVersionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest) *datapb.GetDataViewVersionsResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.GetDataViewVersionsResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.GetDataViewVersionsRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataCoord_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions' +type MockDataCoord_GetDataViewVersions_Call struct { + *mock.Call +} + +// GetDataViewVersions is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *datapb.GetDataViewVersionsRequest +func (_e *MockDataCoord_Expecter) GetDataViewVersions(_a0 interface{}, _a1 interface{}) *MockDataCoord_GetDataViewVersions_Call { + return &MockDataCoord_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions", _a0, _a1)} +} + +func (_c *MockDataCoord_GetDataViewVersions_Call) Run(run func(_a0 context.Context, _a1 *datapb.GetDataViewVersionsRequest)) *MockDataCoord_GetDataViewVersions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datapb.GetDataViewVersionsRequest)) + }) + return _c +} + +func (_c *MockDataCoord_GetDataViewVersions_Call) Return(_a0 *datapb.GetDataViewVersionsResponse, _a1 error) *MockDataCoord_GetDataViewVersions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataCoord_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error)) *MockDataCoord_GetDataViewVersions_Call { + _c.Call.Return(run) + return _c +} + // GetFlushAllState provides a mock function with given fields: _a0, _a1 func (_m *MockDataCoord) GetFlushAllState(_a0 context.Context, _a1 *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datacoord_client.go b/internal/mocks/mock_datacoord_client.go index 9f42a0f953877..c473c8d812f0f 100644 --- a/internal/mocks/mock_datacoord_client.go +++ b/internal/mocks/mock_datacoord_client.go @@ -1336,6 +1336,80 @@ func (_c *MockDataCoordClient_GetComponentStates_Call) RunAndReturn(run func(con return _c } +// GetDataViewVersions provides a mock function with given fields: ctx, in, opts +func (_m *MockDataCoordClient) GetDataViewVersions(ctx context.Context, in *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetDataViewVersions") + } + + var r0 *datapb.GetDataViewVersionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) *datapb.GetDataViewVersionsResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.GetDataViewVersionsResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataCoordClient_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions' +type MockDataCoordClient_GetDataViewVersions_Call struct { + *mock.Call +} + +// GetDataViewVersions is a helper method to define mock.On call +// - ctx context.Context +// - in *datapb.GetDataViewVersionsRequest +// - opts ...grpc.CallOption +func (_e *MockDataCoordClient_Expecter) GetDataViewVersions(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_GetDataViewVersions_Call { + return &MockDataCoordClient_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataCoordClient_GetDataViewVersions_Call) Run(run func(ctx context.Context, in *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption)) *MockDataCoordClient_GetDataViewVersions_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*datapb.GetDataViewVersionsRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataCoordClient_GetDataViewVersions_Call) Return(_a0 *datapb.GetDataViewVersionsResponse, _a1 error) *MockDataCoordClient_GetDataViewVersions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataCoordClient_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error)) *MockDataCoordClient_GetDataViewVersions_Call { + _c.Call.Return(run) + return _c +} + // GetFlushAllState provides a mock function with given fields: ctx, in, opts func (_m *MockDataCoordClient) GetFlushAllState(ctx context.Context, in *milvuspb.GetFlushAllStateRequest, opts ...grpc.CallOption) (*milvuspb.GetFlushAllStateResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index ecab1cdcfb07c..2cc6cd2c6d8de 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -52,6 +52,7 @@ service DataCoord { rpc GetSegmentInfoChannel(GetSegmentInfoChannelRequest) returns (milvus.StringResponse){} rpc SaveBinlogPaths(SaveBinlogPathsRequest) returns (common.Status){} + rpc GetDataViewVersions(GetDataViewVersionsRequest) returns (GetDataViewVersionsResponse){} rpc GetRecoveryInfo(GetRecoveryInfoRequest) returns (GetRecoveryInfoResponse){} rpc GetRecoveryInfoV2(GetRecoveryInfoRequestV2) returns (GetRecoveryInfoResponseV2){} rpc GetChannelRecoveryInfo(GetChannelRecoveryInfoRequest) returns (GetChannelRecoveryInfoResponse){} @@ -462,6 +463,16 @@ message Binlog { int64 memory_size = 7; } +message GetDataViewVersionsRequest { + common.MsgBase base = 1; + repeated int64 collectionIDs = 2; +} + +message GetDataViewVersionsResponse { + common.Status status = 1; + map data_view_versions = 2; +} + message GetRecoveryInfoResponse { common.Status status = 1; repeated VchannelInfo channels = 2; diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index e6ab9a104f821..61188cc30a7a9 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -49,6 +49,7 @@ type Broker interface { ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) ([]*datapb.SegmentInfo, error) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error) + GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) @@ -231,6 +232,26 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection return recoveryInfo.Channels, recoveryInfo.Binlogs, nil } +// GetDataViewVersions retrieves the data view versions of the target collections. +func (broker *CoordinatorBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) { + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) + defer cancel() + log := log.Ctx(ctx).With(zap.Int("numCollection", len(collectionIDs))) + + req := &datapb.GetDataViewVersionsRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_GetRecoveryInfo), + ), + CollectionIDs: collectionIDs, + } + resp, err := broker.dataCoord.GetDataViewVersions(ctx, req) + if err = merr.CheckRPCCall(resp, err); err != nil { + log.Warn("GetDataViewVersions failed", zap.Error(err)) + return nil, err + } + return resp.GetDataViewVersions(), nil +} + func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() diff --git a/internal/querycoordv2/meta/mock_broker.go b/internal/querycoordv2/meta/mock_broker.go index 23b1b15f28e12..b510aff115502 100644 --- a/internal/querycoordv2/meta/mock_broker.go +++ b/internal/querycoordv2/meta/mock_broker.go @@ -214,6 +214,65 @@ func (_c *MockBroker_GetCollectionLoadInfo_Call) RunAndReturn(run func(context.C return _c } +// GetDataViewVersions provides a mock function with given fields: ctx, collectionIDs +func (_m *MockBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) { + ret := _m.Called(ctx, collectionIDs) + + if len(ret) == 0 { + panic("no return value specified for GetDataViewVersions") + } + + var r0 map[int64]int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) (map[int64]int64, error)); ok { + return rf(ctx, collectionIDs) + } + if rf, ok := ret.Get(0).(func(context.Context, []int64) map[int64]int64); ok { + r0 = rf(ctx, collectionIDs) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]int64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []int64) error); ok { + r1 = rf(ctx, collectionIDs) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockBroker_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions' +type MockBroker_GetDataViewVersions_Call struct { + *mock.Call +} + +// GetDataViewVersions is a helper method to define mock.On call +// - ctx context.Context +// - collectionIDs []int64 +func (_e *MockBroker_Expecter) GetDataViewVersions(ctx interface{}, collectionIDs interface{}) *MockBroker_GetDataViewVersions_Call { + return &MockBroker_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions", ctx, collectionIDs)} +} + +func (_c *MockBroker_GetDataViewVersions_Call) Run(run func(ctx context.Context, collectionIDs []int64)) *MockBroker_GetDataViewVersions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]int64)) + }) + return _c +} + +func (_c *MockBroker_GetDataViewVersions_Call) Return(_a0 map[int64]int64, _a1 error) *MockBroker_GetDataViewVersions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockBroker_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, []int64) (map[int64]int64, error)) *MockBroker_GetDataViewVersions_Call { + _c.Call.Return(run) + return _c +} + // GetIndexInfo provides a mock function with given fields: ctx, collectionID, segmentIDs func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segmentIDs ...int64) (map[int64][]*querypb.FieldIndexInfo, error) { _va := make([]interface{}, len(segmentIDs)) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 45e6345488b45..0f2d805e2b1ba 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -40,6 +40,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +const InitialDataViewVersion = 0 + type targetOp int func (op *targetOp) String() string { @@ -93,6 +95,8 @@ type TargetObserver struct { // loadedDispatcher updates targets for loaded collections. loadedDispatcher *taskDispatcher[int64] + dataViewVersions *typeutil.ConcurrentMap[int64, int64] + keylocks *lock.KeyLock[int64] startOnce sync.Once @@ -118,6 +122,7 @@ func NewTargetObserver( updateChan: make(chan targetUpdateRequest, 10), readyNotifiers: make(map[int64][]chan struct{}), initChan: make(chan initRequest), + dataViewVersions: typeutil.NewConcurrentMap[int64, int64](), keylocks: lock.NewKeyLock[int64](), } @@ -176,13 +181,8 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.clean() - loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) { - if collection.GetStatus() == querypb.LoadStatus_Loaded { - return collection.GetCollectionID(), true - } - return 0, false - }) - ob.loadedDispatcher.AddTask(loaded...) + versionUpdatedCollections := ob.GetVersionUpdatedCollections() + ob.loadedDispatcher.AddTask(versionUpdatedCollections...) case req := <-ob.updateChan: log.Info("manually trigger update target", @@ -227,6 +227,41 @@ func (ob *TargetObserver) schedule(ctx context.Context) { } } +func (ob *TargetObserver) GetVersionUpdatedCollections() []int64 { + loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) { + if collection.GetStatus() == querypb.LoadStatus_Loaded { + return collection.GetCollectionID(), true + } + return 0, false + }) + versions, err := ob.broker.GetDataViewVersions(context.Background(), loaded) + if err != nil { + log.Warn("GetDataViewVersions from dc failed", zap.Error(err)) + return nil + } + + var ( + staleCnt int + updatedCnt int + ) + + ret := make([]int64, 0) + for _, id := range loaded { + new := versions[id] + current, ok := ob.dataViewVersions.Get(id) + if !ok || new == InitialDataViewVersion || new > current { + ret = append(ret, id) + ob.dataViewVersions.GetOrInsert(id, new) + updatedCnt++ + continue + } + staleCnt++ + } + log.Info("get version updatedCnt collections done", zap.Int("totalCnt", len(loaded)), + zap.Int("staleCnt", staleCnt), zap.Int("updatedCnt", updatedCnt)) + return ret +} + // Check whether provided collection is has current target. // If not, submit an async task into dispatcher. func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { @@ -311,6 +346,14 @@ func (ob *TargetObserver) ReleasePartition(collectionID int64, partitionID ...in func (ob *TargetObserver) clean() { collectionSet := typeutil.NewUniqueSet(ob.meta.GetAll()...) + // for collection which has been dropped/released, clear data version cache + ob.dataViewVersions.Range(func(collectionID int64, _ int64) bool { + if !collectionSet.Contain(collectionID) { + ob.dataViewVersions.Remove(collectionID) + } + return true + }) + // for collection which has been removed from target, try to clear nextTargetLastUpdate ob.nextTargetLastUpdate.Range(func(collectionID int64, _ time.Time) bool { if !collectionSet.Contain(collectionID) { @@ -352,6 +395,8 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error { if err != nil { log.Warn("failed to update next target for collection", zap.Error(err)) + // update next target failed, remove data view version cache + ob.dataViewVersions.Remove(collectionID) return err } ob.updateNextTargetTimestamp(collectionID) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 55bb1a1894d02..da60f7dbfefd2 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3318,6 +3318,9 @@ type dataCoordConfig struct { EnableStatsTask ParamItem `refreshable:"true"` TaskCheckInterval ParamItem `refreshable:"true"` + + // data view + DataViewUpdateInterval ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -4191,6 +4194,16 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: false, } p.TaskCheckInterval.Init(base.mgr) + + p.DataViewUpdateInterval = ParamItem{ + Key: "dataCoord.dataView.updateInterval", + Version: "2.5.0", + Doc: "The interval (in seconds) for trying to update the data view of all collections.", + DefaultValue: "10", + PanicIfEmpty: false, + Export: false, + } + p.DataViewUpdateInterval.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/typeutil/map.go b/pkg/util/typeutil/map.go index be4f5af5b74ab..35459194b8b8a 100644 --- a/pkg/util/typeutil/map.go +++ b/pkg/util/typeutil/map.go @@ -7,7 +7,7 @@ import ( ) // MapEqual returns true if the two map contain the same keys and values -func MapEqual(left, right map[int64]int64) bool { +func MapEqual[K, V comparable](left, right map[K]V) bool { if len(left) != len(right) { return false }