Skip to content

Commit

Permalink
fix bm25 import segment loss stats
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd committed Dec 30, 2024
1 parent e19b8f2 commit 58b7ce2
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 7 deletions.
4 changes: 2 additions & 2 deletions internal/datacoord/import_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,13 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
for _, info := range resp.GetImportSegmentsInfo() {
// try to parse path and fill logID
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs())
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs(), info.GetBm25Logs())
if err != nil {
log.Warn("fail to CompressBinLogs for import binlogs",
WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...)
return
}
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs())
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs(), info.GetBm25Logs())
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
err = s.meta.UpdateSegmentsInfo(context.TODO(), op1, op2)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs
}
}

func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb.FieldBinlog) UpdateOperator {
func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs []*datapb.FieldBinlog) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
Expand All @@ -933,6 +933,8 @@ func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*dat
segment.Binlogs = binlogs
segment.Statslogs = statslogs
segment.Deltalogs = deltalogs
segment.Bm25Statslogs = bm25logs
log.Info("test--", zap.Any("bm25", bm25logs))
modPack.increments[segmentID] = metastore.BinlogsIncrement{
Segment: segment.SegmentInfo,
}
Expand Down
1 change: 1 addition & 0 deletions internal/datanode/importv2/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction {
segmentsInfo[segment].Binlogs = mergeFn(segmentsInfo[segment].Binlogs, info.GetBinlogs())
segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Statslogs, info.GetStatslogs())
segmentsInfo[segment].Deltalogs = mergeFn(segmentsInfo[segment].Deltalogs, info.GetDeltalogs())
segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Bm25Logs, info.GetBm25Logs())
return
}
segmentsInfo[segment] = info
Expand Down
3 changes: 2 additions & 1 deletion internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewSyncTask(ctx context.Context,

func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) {
segmentID := syncTask.SegmentID()
insertBinlogs, statsBinlog, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs()
insertBinlogs, statsBinlog, bm25Log, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs()
metaCache := metaCaches[syncTask.ChannelName()]
segment, ok := metaCache.GetSegmentByID(segmentID)
if !ok {
Expand All @@ -120,6 +120,7 @@ func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache
ImportedRows: segment.FlushedRows(),
Binlogs: lo.Values(insertBinlogs),
Statslogs: lo.Values(statsBinlog),
Bm25Logs: lo.Values(bm25Log),
Deltalogs: deltaLogs,
}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/flushcommon/syncmgr/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,8 @@ func (t *SyncTask) IsFlush() bool {
return t.isFlush
}

func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) {
return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog
func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) {
return t.insertBinlogs, t.statsBinlogs, t.bm25Binlogs, t.deltaBinlog
}

func (t *SyncTask) MarshalJSON() ([]byte, error) {
Expand Down
1 change: 1 addition & 0 deletions internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ message ImportSegmentInfo {
repeated FieldBinlog binlogs = 3;
repeated FieldBinlog statslogs = 4;
repeated FieldBinlog deltalogs = 5;
repeated FieldBinlog bm25logs = 6;
}

message QueryImportResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *channelLifetime) Run() error {
return
}
if tt, ok := t.(*syncmgr.SyncTask); ok {
insertLogs, _, _ := tt.Binlogs()
insertLogs, _, _, _ := tt.Binlogs()
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), stats.SyncOperationMetrics{
BinLogCounterIncr: 1,
BinLogFileCounterIncr: uint64(len(insertLogs)),
Expand Down

0 comments on commit 58b7ce2

Please sign in to comment.