Skip to content

Commit

Permalink
Add iterators (#27643)
Browse files Browse the repository at this point in the history
See also: #27606

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Oct 18, 2023
1 parent eff773a commit 7358c35
Show file tree
Hide file tree
Showing 6 changed files with 621 additions and 3 deletions.
103 changes: 103 additions & 0 deletions internal/datanode/iterators/binlog_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package iterator

import (
"sync"

"go.uber.org/atomic"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type BinlogIterator struct {
disposed atomic.Bool
disposedCh chan struct{}
disposedOnce sync.Once

data *storage.InsertData
label *Label
pkFieldID int64
pkType schemapb.DataType
pos int
}

var _ Iterator = (*BinlogIterator)(nil)

// NewInsertBinlogIterator creates a new iterator
func NewInsertBinlogIterator(v [][]byte, pkFieldID typeutil.UniqueID, pkType schemapb.DataType, label *Label) (*BinlogIterator, error) {
blobs := make([]*storage.Blob, len(v))
for i := range blobs {
blobs[i] = &storage.Blob{Value: v[i]}
}

reader := storage.NewInsertCodec()
_, _, iData, err := reader.Deserialize(blobs)
if err != nil {
return nil, err
}

return &BinlogIterator{
disposedCh: make(chan struct{}),
data: iData,
pkFieldID: pkFieldID,
pkType: pkType,
label: label,
}, nil
}

// HasNext returns true if the iterator have unread record
func (i *BinlogIterator) HasNext() bool {
return !i.isDisposed() && i.hasNext()
}

func (i *BinlogIterator) Next() (*LabeledRowData, error) {
if i.isDisposed() {
return nil, ErrDisposed
}

if !i.hasNext() {
return nil, ErrNoMoreRecord
}

fields := make(map[int64]interface{})
for fieldID, fieldData := range i.data.Data {
fields[fieldID] = fieldData.GetRow(i.pos)
}

pk, err := storage.GenPrimaryKeyByRawData(i.data.Data[i.pkFieldID].GetRow(i.pos), i.pkType)
if err != nil {
return nil, err
}

row := &InsertRow{
ID: i.data.Data[common.RowIDField].GetRow(i.pos).(int64),
Timestamp: uint64(i.data.Data[common.TimeStampField].GetRow(i.pos).(int64)),
PK: pk,
Value: fields,
}
i.pos++
return NewLabeledRowData(row, i.label), nil
}

// Dispose disposes the iterator
func (i *BinlogIterator) Dispose() {
i.disposed.CompareAndSwap(false, true)
i.disposedOnce.Do(func() {
close(i.disposedCh)
})
}

func (i *BinlogIterator) hasNext() bool {
return i.pos < i.data.GetRowNum()
}

func (i *BinlogIterator) isDisposed() bool {
return i.disposed.Load()
}

// Disposed wait forever for the iterator to dispose
func (i *BinlogIterator) WaitForDisposed() {
<-i.disposedCh
}
Loading

0 comments on commit 7358c35

Please sign in to comment.