diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index e5b7585e2fb82..9309a29e166af 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -314,13 +314,13 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { if resp.GetState() == datapb.ImportTaskStateV2_Completed { for _, info := range resp.GetImportSegmentsInfo() { // try to parse path and fill logID - err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs()) + err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs(), info.GetBm25Logs()) if err != nil { log.Warn("fail to CompressBinLogs for import binlogs", WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...) return } - op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs()) + op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs(), info.GetBm25Logs()) op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed) err = s.meta.UpdateSegmentsInfo(context.TODO(), op1, op2) if err != nil { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index aab1f9a41e78e..452d181175557 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -921,7 +921,7 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs } } -func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb.FieldBinlog) UpdateOperator { +func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs []*datapb.FieldBinlog) UpdateOperator { return func(modPack *updateSegmentPack) bool { segment := modPack.Get(segmentID) if segment == nil { @@ -933,6 +933,8 @@ func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*dat segment.Binlogs = binlogs segment.Statslogs = statslogs segment.Deltalogs = deltalogs + segment.Bm25Statslogs = bm25logs + log.Info("test--", zap.Any("bm25", bm25logs)) modPack.increments[segmentID] = metastore.BinlogsIncrement{ Segment: segment.SegmentInfo, } diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index 0d7c46e6cc474..7bde8b0ac56d1 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -143,6 +143,7 @@ func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction { segmentsInfo[segment].Binlogs = mergeFn(segmentsInfo[segment].Binlogs, info.GetBinlogs()) segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Statslogs, info.GetStatslogs()) segmentsInfo[segment].Deltalogs = mergeFn(segmentsInfo[segment].Deltalogs, info.GetDeltalogs()) + segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Bm25Logs, info.GetBm25Logs()) return } segmentsInfo[segment] = info diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index eb6c592f85b12..2f44e24bf7f93 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -105,7 +105,7 @@ func NewSyncTask(ctx context.Context, func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) { segmentID := syncTask.SegmentID() - insertBinlogs, statsBinlog, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs() + insertBinlogs, statsBinlog, bm25Log, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs() metaCache := metaCaches[syncTask.ChannelName()] segment, ok := metaCache.GetSegmentByID(segmentID) if !ok { @@ -120,6 +120,7 @@ func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache ImportedRows: segment.FlushedRows(), Binlogs: lo.Values(insertBinlogs), Statslogs: lo.Values(statsBinlog), + Bm25Logs: lo.Values(bm25Log), Deltalogs: deltaLogs, }, nil } diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index e5c2f913cd0c6..d103d300900aa 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -414,8 +414,8 @@ func (t *SyncTask) IsFlush() bool { return t.isFlush } -func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) { - return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog +func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) { + return t.insertBinlogs, t.statsBinlogs, t.bm25Binlogs, t.deltaBinlog } func (t *SyncTask) MarshalJSON() ([]byte, error) { diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 5d78f28cf0ddf..c7ca6ab8ce9c3 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -850,6 +850,7 @@ message ImportSegmentInfo { repeated FieldBinlog binlogs = 3; repeated FieldBinlog statslogs = 4; repeated FieldBinlog deltalogs = 5; + repeated FieldBinlog bm25logs = 6; } message QueryImportResponse { diff --git a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go index 51965267f56fb..cfe69d68d4e28 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go +++ b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go @@ -131,7 +131,7 @@ func (c *channelLifetime) Run() error { return } if tt, ok := t.(*syncmgr.SyncTask); ok { - insertLogs, _, _ := tt.Binlogs() + insertLogs, _, _, _ := tt.Binlogs() resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), stats.SyncOperationMetrics{ BinLogCounterIncr: 1, BinLogFileCounterIncr: uint64(len(insertLogs)),