diff --git a/pkg/datasync/consumer_test.go b/pkg/datasync/consumer_test.go index 35fafd77894f9..edf2629043c53 100644 --- a/pkg/datasync/consumer_test.go +++ b/pkg/datasync/consumer_test.go @@ -1070,8 +1070,14 @@ func TestCompleteData(t *testing.T) { err := c.completeData(ctx) assert.NoError(t, err) files, err := fileservice.SortedList(c.dstFS.List(ctx, "")) + var count int + for i := range files { + if !files[i].IsDir && files[i].Size < 10 { + count++ + } + } assert.NoError(t, err) - assert.Equal(t, 35+(150-102), len(files)) + assert.Equal(t, 150-102, count) requiredLsn, err := c.logClient.getRequiredLsn(ctx) assert.NoError(t, err) assert.Equal(t, uint64(103+(150-102)-1), requiredLsn) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index c0a05f0452ead..54939a41f6a27 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -33,6 +33,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/pubsub" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" "github.com/matrixorigin/matrixone/pkg/perfcounter" @@ -44,7 +45,6 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic" "github.com/matrixorigin/matrixone/pkg/vm/engine" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -1009,7 +1009,7 @@ func (tcc *TxnCompilerContext) GetQueryResultMeta(uuid string) ([]*plan.ColDef, // get file size path := catalog.BuildQueryResultMetaPath(proc.GetSessionInfo().Account, uuid) // read meta's meta - reader, err := blockio.NewFileReader(proc.GetService(), proc.Base.FileService, path) + reader, err := ioutil.NewFileReader(proc.Base.FileService, path) if err != nil { return nil, "", err } diff --git a/pkg/frontend/query_result.go b/pkg/frontend/query_result.go index e7e8132b7fe56..eba61c1844586 100644 --- a/pkg/frontend/query_result.go +++ b/pkg/frontend/query_result.go @@ -34,11 +34,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/frontend/constant" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -362,7 +362,7 @@ func checkPrivilege(sid string, uuids []string, reqCtx context.Context, ses *Ses for _, id := range uuids { // var size int64 = -1 path := catalog.BuildQueryResultMetaPath(ses.GetTenantInfo().GetTenant(), id) - reader, err := blockio.NewFileReader(sid, f, path) + reader, err := ioutil.NewFileReader(f, path) if err != nil { return err } @@ -553,7 +553,7 @@ type resultFileInfo struct { func doDumpQueryResult(ctx context.Context, ses *Session, eParam *tree.ExportParam) error { var err error var columnDefs *plan.ResultColDef - var reader *blockio.BlockReader + var reader *ioutil.BlockReader var blocks []objectio.BlockObject var files []resultFileInfo @@ -690,7 +690,7 @@ func openResultMeta(ctx context.Context, ses *Session, queryId string) (*plan.Re } metaFile := catalog.BuildQueryResultMetaPath(account.GetTenant(), queryId) // read meta's meta - reader, err := blockio.NewFileReader(ses.service, getPu(ses.GetService()).FileService, metaFile) + reader, err := ioutil.NewFileReader(getPu(ses.GetService()).FileService, metaFile) if err != nil { return nil, err } @@ -748,10 +748,10 @@ func getResultFiles(ctx context.Context, ses *Session, queryId string) ([]result } // openResultFile reads all blocks of the result file -func openResultFile(ctx context.Context, ses *Session, fileName string, fileSize int64) (*blockio.BlockReader, []objectio.BlockObject, error) { +func openResultFile(ctx context.Context, ses *Session, fileName string, fileSize int64) (*ioutil.BlockReader, []objectio.BlockObject, error) { // read result's blocks filePath := getPathOfQueryResultFile(fileName) - reader, err := blockio.NewFileReader(ses.GetService(), getPu(ses.GetService()).FileService, filePath) + reader, err := ioutil.NewFileReader(getPu(ses.GetService()).FileService, filePath) if err != nil { return nil, nil, err } diff --git a/pkg/vm/engine/tae/blockio/reader.go b/pkg/objectio/ioutil/reader.go similarity index 86% rename from pkg/vm/engine/tae/blockio/reader.go rename to pkg/objectio/ioutil/reader.go index 7598e0079d9d9..25aa7aeaef96a 100644 --- a/pkg/vm/engine/tae/blockio/reader.go +++ b/pkg/objectio/ioutil/reader.go @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package blockio +package ioutil import ( "context" + "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -24,28 +25,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/objectio" ) -const ( - AsyncIo = 1 - SyncIo = 2 -) - -var IoModel = SyncIo - type BlockReader struct { reader *objectio.ObjectReader - aio *IoPipeline -} - -type fetchParams struct { - idxes []uint16 - typs []types.Type - blk uint16 - pool *mpool.MPool - reader *objectio.ObjectReader } func NewObjectReader( - sid string, service fileservice.FileService, key objectio.Location, opts ...objectio.ReaderOptionFunc, @@ -68,12 +52,10 @@ func NewObjectReader( } return &BlockReader{ reader: reader, - aio: GetPipeline(sid), }, nil } func NewFileReader( - sid string, service fileservice.FileService, name string, ) (*BlockReader, error) { @@ -86,7 +68,6 @@ func NewFileReader( } return &BlockReader{ reader: reader, - aio: GetPipeline(sid), }, nil } @@ -117,24 +98,9 @@ func (r *BlockReader) LoadColumns( return } var ioVectors fileservice.IOVector - if IoModel == AsyncIo { - proc := fetchParams{ - idxes: cols, - blk: blk, - typs: typs, - pool: m, - reader: r.reader, - } - var v any - if v, err = r.aio.Fetch(ctx, proc); err != nil { - return - } - ioVectors = v.(fileservice.IOVector) - } else { - ioVectors, err = r.reader.ReadOneBlock(ctx, cols, typs, blk, m) - if err != nil { - return - } + ioVectors, err = r.reader.ReadOneBlock(ctx, cols, typs, blk, m) + if err != nil { + return } release = func() { objectio.ReleaseIOVector(&ioVectors) @@ -339,29 +305,3 @@ func (r *BlockReader) GetName() string { func (r *BlockReader) GetObjectReader() *objectio.ObjectReader { return r.reader } - -func Prefetch( - sid string, - service fileservice.FileService, - key objectio.Location, -) error { - params, err := BuildPrefetchParams(service, key) - if err != nil { - return err - } - params.typ = PrefetchFileType - return MustGetPipeline(sid).Prefetch(params) -} - -func PrefetchMeta( - sid string, - service fileservice.FileService, - key objectio.Location, -) error { - params, err := BuildPrefetchParams(service, key) - if err != nil { - return err - } - params.typ = PrefetchMetaType - return MustGetPipeline(sid).Prefetch(params) -} diff --git a/pkg/vm/engine/engine_util/sinker.go b/pkg/objectio/ioutil/sinker.go similarity index 96% rename from pkg/vm/engine/engine_util/sinker.go rename to pkg/objectio/ioutil/sinker.go index f29474b19345c..1cf8f29b33af5 100644 --- a/pkg/vm/engine/engine_util/sinker.go +++ b/pkg/objectio/ioutil/sinker.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package engine_util +package ioutil import ( "context" @@ -23,10 +23,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/sql/colexec" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" ) const DefaultInMemoryStagedSize = mpool.MB * 16 @@ -80,7 +78,7 @@ type FileSinker interface { var _ FileSinker = new(FSinkerImpl) type FSinkerImpl struct { - writer *blockio.BlockWriter + writer *BlockWriter mp *mpool.MPool fs fileservice.FileService @@ -95,12 +93,12 @@ type FSinkerImpl struct { func (s *FSinkerImpl) Sink(ctx context.Context, b *batch.Batch) error { if s.writer == nil { if s.isTombstone { - s.writer = blockio.ConstructTombstoneWriter( + s.writer = ConstructTombstoneWriter( s.hiddenSelection, s.fs, ) } else { - s.writer = blockio.ConstructWriter( + s.writer = ConstructWriter( s.schemaVersion, s.seqnums, s.sortKeyPos, @@ -273,7 +271,7 @@ func NewSinker( return sinker } -type stats struct { +type sinkerStats struct { Name string HighWatermarkCnt uint64 HighWatermarkBytes uint64 @@ -281,12 +279,12 @@ type stats struct { CurrentBytes uint64 } -func (s *stats) String() string { +func (s *sinkerStats) String() string { return fmt.Sprintf("%s, high cnt: %d, current cnt: %d, hight bytes: %d, current bytes: %d", s.Name, s.HighWatermarkCnt, s.CurrentCnt, s.HighWatermarkBytes, s.CurrentBytes) } -func (s *stats) updateCount(n int) { +func (s *sinkerStats) updateCount(n int) { if n > 0 { s.CurrentCnt += uint64(n) } else if n < 0 { @@ -298,7 +296,7 @@ func (s *stats) updateCount(n int) { } } -func (s *stats) updateBytes(n int) { +func (s *sinkerStats) updateBytes(n int) { if n > 0 { s.CurrentBytes += uint64(n) } else if n < 0 { @@ -326,7 +324,7 @@ type Sinker struct { factory FileSinkerFactory } staged struct { - inMemStats stats + inMemStats sinkerStats inMemory []*batch.Batch persisted []objectio.ObjectStats inMemorySize int @@ -339,7 +337,7 @@ type Sinker struct { buf struct { isOwner bool - bufStats stats + bufStats sinkerStats buffers *containers.OneSchemaBatchBuffer } @@ -443,7 +441,7 @@ func (sinker *Sinker) trySortInMemoryStaged(ctx context.Context) error { return nil } for _, bat := range sinker.staged.inMemory { - if err := mergesort.SortColumnsByIndex( + if err := mergeutil.SortColumnsByIndex( bat.Vecs, sinker.schema.sortKeyIdx, sinker.mp, @@ -487,7 +485,7 @@ func (sinker *Sinker) trySpill(ctx context.Context) error { if sinker.schema.sortKeyIdx != -1 { buffer := sinker.fetchBuffer() // note the lifecycle of buffer defer sinker.putBackBuffer(buffer) - if err := colexec.MergeSortBatches( + if err := mergeutil.MergeSortBatches( sinker.staged.inMemory, sinker.schema.sortKeyIdx, buffer, diff --git a/pkg/vm/engine/engine_util/sinker_test.go b/pkg/objectio/ioutil/sinker_test.go similarity index 99% rename from pkg/vm/engine/engine_util/sinker_test.go rename to pkg/objectio/ioutil/sinker_test.go index fb20f1a1bfdd8..8229b1908a15c 100644 --- a/pkg/vm/engine/engine_util/sinker_test.go +++ b/pkg/objectio/ioutil/sinker_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package engine_util +package ioutil import ( "context" diff --git a/pkg/vm/engine/tae/blockio/write_index.go b/pkg/objectio/ioutil/write_index.go similarity index 99% rename from pkg/vm/engine/tae/blockio/write_index.go rename to pkg/objectio/ioutil/write_index.go index 64b9066e6fbb0..66f891da72e96 100644 --- a/pkg/vm/engine/tae/blockio/write_index.go +++ b/pkg/objectio/ioutil/write_index.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package blockio +package ioutil import ( hll "github.com/axiomhq/hyperloglog" diff --git a/pkg/vm/engine/tae/blockio/writer.go b/pkg/objectio/ioutil/writer.go similarity index 99% rename from pkg/vm/engine/tae/blockio/writer.go rename to pkg/objectio/ioutil/writer.go index 792fa0955a249..9795a2d6c5bc4 100644 --- a/pkg/vm/engine/tae/blockio/writer.go +++ b/pkg/objectio/ioutil/writer.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package blockio +package ioutil import ( "context" diff --git a/pkg/sql/colexec/merge_util.go b/pkg/objectio/mergeutil/types.go similarity index 57% rename from pkg/sql/colexec/merge_util.go rename to pkg/objectio/mergeutil/types.go index 56ffd592aed71..d67799aa75286 100644 --- a/pkg/sql/colexec/merge_util.go +++ b/pkg/objectio/mergeutil/types.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package colexec +package mergeutil import ( "fmt" @@ -21,221 +21,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/sort" ) -type dataSlice[T any] interface { - at(i, j int) T - length(i int) int - size() int -} - -type fixedDataSlice[T any] struct { - cols [][]T -} - -func (f *fixedDataSlice[T]) at(i, j int) T { - return f.cols[i][j] -} -func (f *fixedDataSlice[T]) length(i int) int { - return len(f.cols[i]) -} - -func (f *fixedDataSlice[T]) size() int { - return len(f.cols) -} - -type varlenaDataSlice struct { - cols []struct { - data []types.Varlena - area []byte - } -} - -func (v *varlenaDataSlice) at(i, j int) string { - return v.cols[i].data[j].UnsafeGetString(v.cols[i].area) -} - -func (v *varlenaDataSlice) length(i int) int { - return len(v.cols[i].data) -} - -func (v *varlenaDataSlice) size() int { - return len(v.cols) -} - -type mergeInterface interface { - getNextPos() (int, int, int) -} - -type heapElem[T any] struct { - data T - isNull bool - batIndex int - rowIndex int -} - -// merge we will sort by primary key or -// clusterby key, so we just need one -// vector of every batch. -type merge[T comparable] struct { - // the number of bacthes - size int - - // convert the vectors which need to sort - // into ds data - ds dataSlice[T] - - // pointer is used to specify - // which position we have gotten. - // for example, rowIdx[i] means - // we are now at the rowIdx[i]-th row for - // cols[i] - rowIdx []int - - nulls []*nulls.Nulls - - heap *heapSlice[T] -} - -func newMerge[T comparable](compLess sort.LessFunc[T], ds dataSlice[T], nulls []*nulls.Nulls) mergeInterface { - m := &merge[T]{ - size: ds.size(), - ds: ds, - rowIdx: make([]int, ds.size()), - nulls: nulls, - heap: newHeapSlice(ds.size(), compLess), - } - m.initHeap() - return m -} - -func (m *merge[T]) initHeap() { - for i := 0; i < m.ds.size(); i++ { - if m.ds.length(i) == 0 { - m.rowIdx[i] = -1 - m.size-- - continue - } - heapPush(m.heap, heapElem[T]{ - data: m.ds.at(i, m.rowIdx[i]), - isNull: m.nulls[i].Contains(uint64(m.rowIdx[i])), - batIndex: i, - rowIndex: m.rowIdx[i], - }) - if m.rowIdx[i] >= m.ds.length(i) { - m.rowIdx[i] = -1 - m.size-- - } - } -} - -func (m *merge[T]) getNextPos() (batchIndex, rowIndex, size int) { - data := m.pushNext() - if data == nil { - // now, m.size is 0 - return -1, -1, m.size - } - return data.batIndex, data.rowIndex, m.size -} - -func (m *merge[T]) pushNext() *heapElem[T] { - if m.size == 0 { - return nil - } - data := heapPop(m.heap) - batchIndex := data.batIndex - m.rowIdx[batchIndex]++ - if m.rowIdx[batchIndex] >= m.ds.length(batchIndex) { - m.rowIdx[batchIndex] = -1 - m.size-- - } - if m.rowIdx[batchIndex] != -1 { - heapPush(m.heap, heapElem[T]{ - data: m.ds.at(batchIndex, m.rowIdx[batchIndex]), - isNull: m.nulls[batchIndex].Contains(uint64(m.rowIdx[batchIndex])), - batIndex: batchIndex, - rowIndex: m.rowIdx[batchIndex], - }) - } - return &data -} - -type heapSlice[T any] struct { - lessFunc sort.LessFunc[T] - s []heapElem[T] -} - -func newHeapSlice[T any](n int, lessFunc sort.LessFunc[T]) *heapSlice[T] { - return &heapSlice[T]{ - lessFunc: lessFunc, - s: make([]heapElem[T], 0, n), - } -} - -// Push pushes the element x onto the heap. -// The complexity is Operator(log n) where n = len(h). -func heapPush[T any](h *heapSlice[T], x heapElem[T]) { - h.s = append(h.s, x) - up(h, len(h.s)-1) -} - -// Pop removes and returns the minimum element (according to Less) from the heap. -// The complexity is Operator(log n) where n = len(h). -// Pop is equivalent to Remove(h, 0). -func heapPop[T any](h *heapSlice[T]) heapElem[T] { - n := len(h.s) - 1 - (h.s)[0], (h.s)[n] = (h.s)[n], (h.s)[0] - down(h, 0, n) - res := (h.s)[n] - h.s = (h.s)[:n] - return res -} - -func up[T any](h *heapSlice[T], j int) { - for { - i := (j - 1) / 2 // parent - if i == j || !h.Less(j, i) { - break - } - h.Swap(i, j) - j = i - } -} - -func down[T any](h *heapSlice[T], i0, n int) bool { - i := i0 - for { - j1 := 2*i + 1 - if j1 >= n || j1 < 0 { // j1 < 0 after int overflow - break - } - j := j1 // left child - if j2 := j1 + 1; j2 < n && h.Less(j2, j1) { - j = j2 // = 2*i + 2 // right child - } - if !h.Less(j, i) { - break - } - h.Swap(i, j) - i = j - } - return i > i0 -} - -func (x *heapSlice[T]) Less(i, j int) bool { - if x.s[i].isNull { - return true - } - if x.s[j].isNull { - return false - } - return x.lessFunc(x.s[i].data, x.s[j].data) -} -func (x *heapSlice[T]) Swap(i, j int) { x.s[i], x.s[j] = x.s[j], x.s[i] } -func (x *heapSlice[T]) Len() int { return len(x.s) } - type SinkerT func(*batch.Batch) error func MergeSortBatches( @@ -362,3 +152,21 @@ func MergeSortBatches( } return nil } + +func SortColumnsByIndex( + cols []*vector.Vector, sortIdx int, mp *mpool.MPool, +) (err error) { + sortKey := cols[sortIdx] + sortedIdx := make([]int64, sortKey.Length()) + for i := 0; i < len(sortedIdx); i++ { + sortedIdx[i] = int64(i) + } + sort.Sort(false, false, true, sortedIdx, sortKey) + for i := 0; i < len(cols); i++ { + err = cols[i].Shuffle(sortedIdx, mp) + if err != nil { + return + } + } + return +} diff --git a/pkg/objectio/mergeutil/util.go b/pkg/objectio/mergeutil/util.go new file mode 100644 index 0000000000000..58c5dc618e135 --- /dev/null +++ b/pkg/objectio/mergeutil/util.go @@ -0,0 +1,260 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeutil + +import ( + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/nulls" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/sort" +) + +type dataSlice[T any] interface { + at(i, j int) T + length(i int) int + size() int +} + +func getFixedCols[T types.FixedSizeT](bats []*batch.Batch, idx int) (cols [][]T) { + cols = make([][]T, 0, len(bats)) + for i := range bats { + cols = append(cols, vector.MustFixedColWithTypeCheck[T](bats[i].Vecs[idx])) + } + return +} + +func getVarlenaCols(bats []*batch.Batch, idx int) (cols []struct { + data []types.Varlena + area []byte +}) { + cols = make([]struct { + data []types.Varlena + area []byte + }, 0, len(bats)) + for i := range bats { + data, area := vector.MustVarlenaRawData(bats[i].Vecs[idx]) + cols = append(cols, struct { + data []types.Varlena + area []byte + }{data, area}) + } + return +} + +type fixedDataSlice[T any] struct { + cols [][]T +} + +func (f *fixedDataSlice[T]) at(i, j int) T { + return f.cols[i][j] +} +func (f *fixedDataSlice[T]) length(i int) int { + return len(f.cols[i]) +} + +func (f *fixedDataSlice[T]) size() int { + return len(f.cols) +} + +type varlenaDataSlice struct { + cols []struct { + data []types.Varlena + area []byte + } +} + +func (v *varlenaDataSlice) at(i, j int) string { + return v.cols[i].data[j].UnsafeGetString(v.cols[i].area) +} + +func (v *varlenaDataSlice) length(i int) int { + return len(v.cols[i].data) +} + +func (v *varlenaDataSlice) size() int { + return len(v.cols) +} + +type mergeInterface interface { + getNextPos() (int, int, int) +} + +type heapElem[T any] struct { + data T + isNull bool + batIndex int + rowIndex int +} + +// merge we will sort by primary key or +// clusterby key, so we just need one +// vector of every batch. +type merge[T comparable] struct { + // the number of bacthes + size int + + // convert the vectors which need to sort + // into ds data + ds dataSlice[T] + + // pointer is used to specify + // which position we have gotten. + // for example, rowIdx[i] means + // we are now at the rowIdx[i]-th row for + // cols[i] + rowIdx []int + + nulls []*nulls.Nulls + + heap *heapSlice[T] +} + +func newMerge[T comparable](compLess sort.LessFunc[T], ds dataSlice[T], nulls []*nulls.Nulls) mergeInterface { + m := &merge[T]{ + size: ds.size(), + ds: ds, + rowIdx: make([]int, ds.size()), + nulls: nulls, + heap: newHeapSlice(ds.size(), compLess), + } + m.initHeap() + return m +} + +func (m *merge[T]) initHeap() { + for i := 0; i < m.ds.size(); i++ { + if m.ds.length(i) == 0 { + m.rowIdx[i] = -1 + m.size-- + continue + } + heapPush(m.heap, heapElem[T]{ + data: m.ds.at(i, m.rowIdx[i]), + isNull: m.nulls[i].Contains(uint64(m.rowIdx[i])), + batIndex: i, + rowIndex: m.rowIdx[i], + }) + if m.rowIdx[i] >= m.ds.length(i) { + m.rowIdx[i] = -1 + m.size-- + } + } +} + +func (m *merge[T]) getNextPos() (batchIndex, rowIndex, size int) { + data := m.pushNext() + if data == nil { + // now, m.size is 0 + return -1, -1, m.size + } + return data.batIndex, data.rowIndex, m.size +} + +func (m *merge[T]) pushNext() *heapElem[T] { + if m.size == 0 { + return nil + } + data := heapPop(m.heap) + batchIndex := data.batIndex + m.rowIdx[batchIndex]++ + if m.rowIdx[batchIndex] >= m.ds.length(batchIndex) { + m.rowIdx[batchIndex] = -1 + m.size-- + } + if m.rowIdx[batchIndex] != -1 { + heapPush(m.heap, heapElem[T]{ + data: m.ds.at(batchIndex, m.rowIdx[batchIndex]), + isNull: m.nulls[batchIndex].Contains(uint64(m.rowIdx[batchIndex])), + batIndex: batchIndex, + rowIndex: m.rowIdx[batchIndex], + }) + } + return &data +} + +type heapSlice[T any] struct { + lessFunc sort.LessFunc[T] + s []heapElem[T] +} + +func newHeapSlice[T any](n int, lessFunc sort.LessFunc[T]) *heapSlice[T] { + return &heapSlice[T]{ + lessFunc: lessFunc, + s: make([]heapElem[T], 0, n), + } +} + +// Push pushes the element x onto the heap. +// The complexity is Operator(log n) where n = len(h). +func heapPush[T any](h *heapSlice[T], x heapElem[T]) { + h.s = append(h.s, x) + up(h, len(h.s)-1) +} + +// Pop removes and returns the minimum element (according to Less) from the heap. +// The complexity is Operator(log n) where n = len(h). +// Pop is equivalent to Remove(h, 0). +func heapPop[T any](h *heapSlice[T]) heapElem[T] { + n := len(h.s) - 1 + (h.s)[0], (h.s)[n] = (h.s)[n], (h.s)[0] + down(h, 0, n) + res := (h.s)[n] + h.s = (h.s)[:n] + return res +} + +func up[T any](h *heapSlice[T], j int) { + for { + i := (j - 1) / 2 // parent + if i == j || !h.Less(j, i) { + break + } + h.Swap(i, j) + j = i + } +} + +func down[T any](h *heapSlice[T], i0, n int) bool { + i := i0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && h.Less(j2, j1) { + j = j2 // = 2*i + 2 // right child + } + if !h.Less(j, i) { + break + } + h.Swap(i, j) + i = j + } + return i > i0 +} + +func (x *heapSlice[T]) Less(i, j int) bool { + if x.s[i].isNull { + return true + } + if x.s[j].isNull { + return false + } + return x.lessFunc(x.s[i].data, x.s[j].data) +} +func (x *heapSlice[T]) Swap(i, j int) { x.s[i], x.s[j] = x.s[j], x.s[i] } +func (x *heapSlice[T]) Len() int { return len(x.s) } diff --git a/pkg/sql/colexec/external/external.go b/pkg/sql/colexec/external/external.go index ee45720816f43..68a2724a763dc 100644 --- a/pkg/sql/colexec/external/external.go +++ b/pkg/sql/colexec/external/external.go @@ -48,6 +48,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/sql/colexec" @@ -58,7 +59,6 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/util/trace" "github.com/matrixorigin/matrixone/pkg/vm" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -898,7 +898,7 @@ func scanCsvFile(ctx context.Context, param *ExternalParam, proc *process.Proces } // note: getBatchFromZonemapFile will access Fileservice -func getBatchFromZonemapFile(ctx context.Context, param *ExternalParam, proc *process.Process, objectReader *blockio.BlockReader, bat *batch.Batch) (err error) { +func getBatchFromZonemapFile(ctx context.Context, param *ExternalParam, proc *process.Process, objectReader *ioutil.BlockReader, bat *batch.Batch) (err error) { var tmpBat *batch.Batch var vecTmp *vector.Vector var release func() @@ -1024,7 +1024,7 @@ func needRead(ctx context.Context, param *ExternalParam, proc *process.Process) notReportErrCtx, proc, expr, meta, columnMap, zms, vecs) } -func getZonemapBatch(ctx context.Context, param *ExternalParam, proc *process.Process, objectReader *blockio.BlockReader, bat *batch.Batch) error { +func getZonemapBatch(ctx context.Context, param *ExternalParam, proc *process.Process, objectReader *ioutil.BlockReader, bat *batch.Batch) error { var err error // note: LoadAllBlocks will access Fileservice,must user `ctx` as paramenter param.Zoneparam.bs, err = objectReader.LoadAllBlocks(ctx, proc.GetMPool()) @@ -1046,7 +1046,7 @@ func getZonemapBatch(ctx context.Context, param *ExternalParam, proc *process.Pr func scanZonemapFile(ctx context.Context, param *ExternalParam, proc *process.Process, bat *batch.Batch, analyzer process.Analyzer) error { var err error - param.Filter.blockReader, err = blockio.NewFileReader(proc.GetService(), param.Extern.FileService, param.Fileparam.Filepath) + param.Filter.blockReader, err = ioutil.NewFileReader(param.Extern.FileService, param.Fileparam.Filepath) if err != nil { return err } diff --git a/pkg/sql/colexec/external/types.go b/pkg/sql/colexec/external/types.go index 3e4a79ebaf7c6..cf2e9372dadfe 100644 --- a/pkg/sql/colexec/external/types.go +++ b/pkg/sql/colexec/external/types.go @@ -25,13 +25,13 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/pipeline" "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/sql/util/csvparser" "github.com/matrixorigin/matrixone/pkg/vm" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -100,7 +100,7 @@ type FilterParam struct { zonemappable bool columnMap map[int]int FilterExpr *plan.Expr - blockReader *blockio.BlockReader + blockReader *ioutil.BlockReader } type container struct { diff --git a/pkg/sql/colexec/multi_update/s3writer.go b/pkg/sql/colexec/multi_update/s3writer.go index e47a2be75ae93..901cde268c2e6 100644 --- a/pkg/sql/colexec/multi_update/s3writer.go +++ b/pkg/sql/colexec/multi_update/s3writer.go @@ -26,10 +26,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -380,7 +381,7 @@ func (writer *s3Writer) sortAndSyncOneTable( bats []*batch.Batch, needSortBatch bool, needCleanBatch bool) (err error) { - var blockWriter *blockio.BlockWriter + var blockWriter *ioutil.BlockWriter var blockInfos []objectio.BlockInfo var objStats objectio.ObjectStats @@ -472,7 +473,7 @@ func (writer *s3Writer) sortAndSyncOneTable( return err } - err = colexec.MergeSortBatches(bats, sortIndex, buf, sinker, proc.GetMPool(), needCleanBatch) + err = mergeutil.MergeSortBatches(bats, sortIndex, buf, sinker, proc.GetMPool(), needCleanBatch) if err != nil { return } diff --git a/pkg/sql/colexec/multi_update/s3writer_util.go b/pkg/sql/colexec/multi_update/s3writer_util.go index 7a8bc85b9d1c5..8f8247395b5bf 100644 --- a/pkg/sql/colexec/multi_update/s3writer_util.go +++ b/pkg/sql/colexec/multi_update/s3writer_util.go @@ -29,17 +29,17 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" "github.com/matrixorigin/matrixone/pkg/vm/process" ) func generateBlockWriter(writer *s3Writer, proc *process.Process, idx int, - isDelete bool) (*blockio.BlockWriter, error) { + isDelete bool) (*ioutil.BlockWriter, error) { // Use uuid as segment id // TODO: multiple 64m file in one segment obj := colexec.Get().GenerateObject() @@ -53,7 +53,7 @@ func generateBlockWriter(writer *s3Writer, seqnums = nil sortIdx = 0 } - blockWriter, err := blockio.NewBlockWriterNew( + blockWriter, err := ioutil.NewBlockWriterNew( s3, obj, writer.schemaVersions[idx], @@ -264,7 +264,7 @@ func fetchSomeVecFromCompactBatchs( return retBats, nil } -func syncThenGetBlockInfoAndStats(ctx context.Context, blockWriter *blockio.BlockWriter, sortIdx int) ([]objectio.BlockInfo, objectio.ObjectStats, error) { +func syncThenGetBlockInfoAndStats(ctx context.Context, blockWriter *ioutil.BlockWriter, sortIdx int) ([]objectio.BlockInfo, objectio.ObjectStats, error) { blocks, _, err := blockWriter.Sync(ctx) if err != nil { return nil, objectio.ObjectStats{}, err diff --git a/pkg/sql/colexec/s3util.go b/pkg/sql/colexec/s3util.go index 34cbae4ca00d9..198851a058589 100644 --- a/pkg/sql/colexec/s3util.go +++ b/pkg/sql/colexec/s3util.go @@ -28,10 +28,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sort" "github.com/matrixorigin/matrixone/pkg/vm" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -53,7 +54,7 @@ type S3Writer struct { isTombstone bool - writer *blockio.BlockWriter + writer *ioutil.BlockWriter // the third vector only has several rows, not aligns with the other two vectors. blockInfoBat *batch.Batch @@ -286,32 +287,6 @@ func (w *S3Writer) StashBatch(proc *process.Process, bat *batch.Batch) bool { return res } -func getFixedCols[T types.FixedSizeT](bats []*batch.Batch, idx int) (cols [][]T) { - cols = make([][]T, 0, len(bats)) - for i := range bats { - cols = append(cols, vector.MustFixedColWithTypeCheck[T](bats[i].Vecs[idx])) - } - return -} - -func getVarlenaCols(bats []*batch.Batch, idx int) (cols []struct { - data []types.Varlena - area []byte -}) { - cols = make([]struct { - data []types.Varlena - area []byte - }, 0, len(bats)) - for i := range bats { - data, area := vector.MustVarlenaRawData(bats[i].Vecs[idx]) - cols = append(cols, struct { - data []types.Varlena - area []byte - }{data, area}) - } - return -} - func (w *S3Writer) FlushTailBatch(ctx context.Context, proc *process.Process) ([]objectio.BlockInfo, objectio.ObjectStats, error) { if w.batSize >= TagS3SizeForMOLogger { return w.SortAndSync(ctx, proc) @@ -362,7 +337,7 @@ func (w *S3Writer) SortAndSync(ctx context.Context, proc *process.Process) ([]ob _, err := w.writer.WriteBatch(bat) return err } - if err := MergeSortBatches( + if err := mergeutil.MergeSortBatches( w.batches, w.sortIndex, w.buffer, @@ -383,7 +358,7 @@ func (w *S3Writer) generateWriter(proc *process.Process) (objectio.ObjectName, e if err != nil { return nil, err } - w.writer, err = blockio.NewBlockWriterNew( + w.writer, err = ioutil.NewBlockWriterNew( s3, obj, w.schemaVersion, w.seqnums, w.isTombstone, ) if err != nil { diff --git a/pkg/sql/colexec/s3util_test.go b/pkg/sql/colexec/s3util_test.go index 1205669cd8606..5e26680ec6267 100644 --- a/pkg/sql/colexec/s3util_test.go +++ b/pkg/sql/colexec/s3util_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/stretchr/testify/require" @@ -122,7 +123,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -152,7 +153,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -182,7 +183,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -212,7 +213,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -242,7 +243,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -272,7 +273,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -302,7 +303,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -332,7 +333,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -362,7 +363,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, @@ -392,7 +393,7 @@ func TestMergeSortBatches(t *testing.T) { vector.AppendFixed(bat1.Vecs[0], int32(2), false, pool) vector.AppendFixed(bat2.Vecs[0], int32(1), false, pool) - err = MergeSortBatches( + err = mergeutil.MergeSortBatches( []*batch.Batch{bat1, bat2}, 1, buffer, diff --git a/pkg/sql/colexec/table_function/meta_scan.go b/pkg/sql/colexec/table_function/meta_scan.go index d4f3c74cdb2a2..5ab02cd12e5ec 100644 --- a/pkg/sql/colexec/table_function/meta_scan.go +++ b/pkg/sql/colexec/table_function/meta_scan.go @@ -18,8 +18,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/sql/colexec" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -45,7 +45,7 @@ func (s *metaScanState) start(tf *TableFunction, proc *process.Process, nthRow i path := catalog.BuildQueryResultMetaPath(proc.GetSessionInfo().Account, uuid.String()) // Get reader - reader, err := blockio.NewFileReader(proc.GetService(), proc.Base.FileService, path) + reader, err := ioutil.NewFileReader(proc.Base.FileService, path) if err != nil { return err } diff --git a/pkg/util/export/etl/tae.go b/pkg/util/export/etl/tae.go index 5b239ac4682b5..2626126ebe00c 100644 --- a/pkg/util/export/etl/tae.go +++ b/pkg/util/export/etl/tae.go @@ -29,8 +29,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/util/export/table" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" ) const BatchSize = 8192 @@ -48,7 +48,7 @@ type TAEWriter struct { filename string fs fileservice.FileService //writer objectio.Writer - writer *blockio.BlockWriter + writer *ioutil.BlockWriter rows []*table.Row flushRows int @@ -71,7 +71,7 @@ func NewTAEWriter(ctx context.Context, tbl *table.Table, mp *mpool.MPool, filePa w.columnsTypes = append(w.columnsTypes, c.ColType.ToType()) w.idxs[idx] = uint16(idx) } - w.writer, _ = blockio.NewBlockWriter(fs, filePath) + w.writer, _ = ioutil.NewBlockWriter(fs, filePath) return w } @@ -308,7 +308,7 @@ type TAEReader struct { typs []types.Type idxs []uint16 - blockReader *blockio.BlockReader + blockReader *ioutil.BlockReader bs []objectio.BlockObject batchs []*batch.Batch @@ -334,7 +334,7 @@ func NewTaeReader(ctx context.Context, tbl *table.Table, filePath string, filesi r.typs = append(r.typs, c.ColType.ToType()) r.idxs[idx] = uint16(idx) } - r.blockReader, err = blockio.NewFileReaderNoCache(r.fs, r.filepath) + r.blockReader, err = ioutil.NewFileReaderNoCache(r.fs, r.filepath) if err != nil { return nil, err } diff --git a/pkg/vm/engine/disttae/merge.go b/pkg/vm/engine/disttae/merge.go index b3cb10182a2b2..6c0428935350b 100644 --- a/pkg/vm/engine/disttae/merge.go +++ b/pkg/vm/engine/disttae/merge.go @@ -17,10 +17,12 @@ package disttae import ( "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "strings" "sync/atomic" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine" @@ -247,8 +249,8 @@ func (t *cnMergeTask) prepareCommitEntry() *api.MergeCommitEntry { return commitEntry } -func (t *cnMergeTask) PrepareNewWriter() *blockio.BlockWriter { - writer := blockio.ConstructWriterWithSegmentID( +func (t *cnMergeTask) PrepareNewWriter() *ioutil.BlockWriter { + writer := ioutil.ConstructWriterWithSegmentID( t.segmentID, t.num, t.host.version, diff --git a/pkg/vm/engine/disttae/transfer.go b/pkg/vm/engine/disttae/transfer.go index a9d601863dbe3..45611954373d8 100644 --- a/pkg/vm/engine/disttae/transfer.go +++ b/pkg/vm/engine/disttae/transfer.go @@ -30,6 +30,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/txn/trace" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" @@ -38,7 +39,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" "go.uber.org/zap" ) @@ -340,7 +340,7 @@ func batchTransferToTombstones( if err = targetRowids.PreExtend(transferIntents.Length(), mp); err != nil { return } - if err = mergesort.SortColumnsByIndex( + if err = mergeutil.SortColumnsByIndex( []*vector.Vector{searchPKColumn, searchEntryPos, searchBatPos}, 0, mp, @@ -362,7 +362,7 @@ func batchTransferToTombstones( return } - if err = mergesort.SortColumnsByIndex( + if err = mergeutil.SortColumnsByIndex( []*vector.Vector{readPKColumn, targetRowids}, 0, mp, diff --git a/pkg/vm/engine/disttae/transfer_util.go b/pkg/vm/engine/disttae/transfer_util.go index 7d29fb4aad2d0..3c6603dddc83a 100644 --- a/pkg/vm/engine/disttae/transfer_util.go +++ b/pkg/vm/engine/disttae/transfer_util.go @@ -16,18 +16,20 @@ package disttae import ( "context" + "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" "go.uber.org/zap" ) @@ -164,7 +166,7 @@ type TransferFlow struct { newDataObjects []objectio.ObjectStats buffer *containers.OneSchemaBatchBuffer staged *batch.Batch - sinker *engine_util.Sinker + sinker *ioutil.Sinker mp *mpool.MPool fs fileservice.FileService @@ -187,14 +189,14 @@ func (flow *TransferFlow) fillDefaults() { ) } if flow.sinker == nil { - flow.sinker = engine_util.NewTombstoneSinker( + flow.sinker = ioutil.NewTombstoneSinker( flow.hiddenSelection, pkType, flow.mp, flow.fs, - engine_util.WithBuffer(flow.buffer, false), - engine_util.WithMemorySizeThreshold(mpool.MB*16), - engine_util.WithTailSizeCap(0), + ioutil.WithBuffer(flow.buffer, false), + ioutil.WithMemorySizeThreshold(mpool.MB*16), + ioutil.WithTailSizeCap(0), //engine_util.WithAllMergeSorted(), ) } @@ -286,7 +288,7 @@ func (flow *TransferFlow) transferStaged(ctx context.Context) error { // sort staged batch by primary key // TODO: do not sort if fake pk - if err := mergesort.SortColumnsByIndex( + if err := mergeutil.SortColumnsByIndex( staged.Vecs, 1, flow.mp, diff --git a/pkg/vm/engine/tae/blockio/prefetch.go b/pkg/vm/engine/tae/blockio/prefetch.go index a585d5abfa543..1e8deda9b6ced 100644 --- a/pkg/vm/engine/tae/blockio/prefetch.go +++ b/pkg/vm/engine/tae/blockio/prefetch.go @@ -15,6 +15,7 @@ package blockio import ( + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" ) @@ -40,6 +41,13 @@ func BuildPrefetchParams(service fileservice.FileService, key objectio.Location) return pp, nil } +type fetchParams struct { + idxes []uint16 + typs []types.Type + blk uint16 + reader *objectio.ObjectReader +} + func buildPrefetchParams(service fileservice.FileService, key objectio.Location) PrefetchParams { return PrefetchParams{ fs: service, @@ -54,3 +62,29 @@ func mergePrefetch(processes []PrefetchParams) map[string]PrefetchParams { } return pc } + +func Prefetch( + sid string, + service fileservice.FileService, + key objectio.Location, +) error { + params, err := BuildPrefetchParams(service, key) + if err != nil { + return err + } + params.typ = PrefetchFileType + return MustGetPipeline(sid).Prefetch(params) +} + +func PrefetchMeta( + sid string, + service fileservice.FileService, + key objectio.Location, +) error { + params, err := BuildPrefetchParams(service, key) + if err != nil { + return err + } + params.typ = PrefetchMetaType + return MustGetPipeline(sid).Prefetch(params) +} diff --git a/pkg/vm/engine/tae/blockio/utils_test.go b/pkg/vm/engine/tae/blockio/utils_test.go index ff377387bbf02..b1ba80c1f9c9c 100644 --- a/pkg/vm/engine/tae/blockio/utils_test.go +++ b/pkg/vm/engine/tae/blockio/utils_test.go @@ -24,6 +24,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -40,7 +41,7 @@ func TestCoarseFilterTombstoneObject(t *testing.T) { rowids := make([]types.Rowid, rowCnt) oss := make([]objectio.ObjectStats, ssCnt) for i := 0; i < ssCnt; i++ { - writer := ConstructTombstoneWriter(objectio.HiddenColumnSelection_None, fs) + writer := ioutil.ConstructTombstoneWriter(objectio.HiddenColumnSelection_None, fs) assert.NotNil(t, writer) bat := batch.NewWithSize(2) diff --git a/pkg/vm/engine/tae/blockio/writer_test.go b/pkg/vm/engine/tae/blockio/writer_test.go index 76e41a6a29c3e..73cf9303e3a7c 100644 --- a/pkg/vm/engine/tae/blockio/writer_test.go +++ b/pkg/vm/engine/tae/blockio/writer_test.go @@ -28,6 +28,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" @@ -69,7 +70,7 @@ func TestWriter_WriteBlockAndZoneMap(t *testing.T) { } service, err := fileservice.NewFileService(ctx, c, nil) assert.Nil(t, err) - writer, _ := NewBlockWriterNew(service, name, 0, nil, false) + writer, _ := ioutil.NewBlockWriterNew(service, name, 0, nil, false) schema := catalog.MockSchemaAll(13, 2) bats := catalog.MockBatch(schema, 40000*2).Split(2) @@ -104,7 +105,7 @@ func TestWriter_WriteBlockAndZoneMap(t *testing.T) { mp := mpool.MustNewZero() metaloc := objectio.BuildLocation(writer.GetName(), blocks[0].GetExtent(), 40000, blocks[0].GetID()) require.NoError(t, err) - reader, err := NewObjectReader("", service, metaloc) + reader, err := ioutil.NewObjectReader(service, metaloc) require.NoError(t, err) meta, err := reader.LoadObjectMeta(context.TODO(), mp) require.NoError(t, err) @@ -178,7 +179,7 @@ func TestWriter_WriteBlockAfterAlter(t *testing.T) { 40000*2, schema.GetSingleSortKey().Idx, nil).Split(2) - writer, _ := NewBlockWriterNew(service, name, 1, seqnums, false) + writer, _ := ioutil.NewBlockWriterNew(service, name, 1, seqnums, false) _, err = writer.WriteBatch(containers.ToCNBatch(bats[0])) assert.Nil(t, err) _, err = writer.WriteBatch(containers.ToCNBatch(bats[1])) @@ -211,7 +212,7 @@ func TestWriter_WriteBlockAfterAlter(t *testing.T) { mp := mpool.MustNewZero() metaloc := objectio.BuildLocation(writer.GetName(), blocks[0].GetExtent(), 40000, blocks[0].GetID()) require.NoError(t, err) - reader, err := NewObjectReader("", service, metaloc) + reader, err := ioutil.NewObjectReader(service, metaloc) require.NoError(t, err) meta, err := reader.LoadObjectMeta(context.TODO(), mp) require.Equal(t, uint16(15), meta.BlockHeader().MetaColumnCount()) @@ -250,7 +251,7 @@ func TestWriter_WriteBlockAndBF(t *testing.T) { } service, err := fileservice.NewFileService(ctx, c, nil) assert.Nil(t, err) - writer, _ := NewBlockWriterNew(service, name, 0, nil, false) + writer, _ := ioutil.NewBlockWriterNew(service, name, 0, nil, false) schema := catalog.MockSchemaAll(4, 2) bat := catalog.MockBatch(schema, 100) @@ -264,7 +265,7 @@ func TestWriter_WriteBlockAndBF(t *testing.T) { mp := mpool.MustNewZero() metaloc := objectio.BuildLocation(writer.GetName(), blocks[0].GetExtent(), 100, blocks[0].GetID()) require.NoError(t, err) - reader, err := NewObjectReader("", service, metaloc) + reader, err := ioutil.NewObjectReader(service, metaloc) require.NoError(t, err) meta, err := reader.LoadObjectMeta(context.TODO(), mp) require.NoError(t, err) @@ -280,7 +281,7 @@ func TestWriter_WriteBlockAndBF(t *testing.T) { require.NoError(t, err) require.False(t, res) name = objectio.BuildObjectName(objectio.NewSegmentid(), 1) - writer2, _ := NewBlockWriterNew(service, name, 0, nil, false) + writer2, _ := ioutil.NewBlockWriterNew(service, name, 0, nil, false) writer2.SetPrimaryKeyWithType(2, 1, index.PrefixFn{ Id: 88, Fn: func(in []byte) []byte { @@ -295,7 +296,7 @@ func TestWriter_WriteBlockAndBF(t *testing.T) { metaloc = objectio.BuildLocation(writer2.GetName(), blocks[0].GetExtent(), 100, blocks[0].GetID()) require.NoError(t, err) - reader, err = NewObjectReader("", service, metaloc) + reader, err = ioutil.NewObjectReader(service, metaloc) require.NoError(t, err) bf, _, err = reader.LoadOneBF(context.Background(), 0) assert.Nil(t, err) @@ -303,7 +304,7 @@ func TestWriter_WriteBlockAndBF(t *testing.T) { assert.Equal(t, uint8(88), bf.PrefixFnId(1)) name = objectio.BuildObjectName(objectio.NewSegmentid(), 2) - writer2, _ = NewBlockWriterNew(service, name, 0, nil, false) + writer2, _ = ioutil.NewBlockWriterNew(service, name, 0, nil, false) writer2.SetPrimaryKeyWithType(2, 2, index.PrefixFn{ Id: 123, Fn: func(in []byte) []byte { @@ -312,7 +313,7 @@ func TestWriter_WriteBlockAndBF(t *testing.T) { }) _, err = writer2.WriteBatch(containers.ToCNBatch(bat)) assert.Equal(t, index.ErrPrefix, err) - writer2, _ = NewBlockWriterNew(service, name, 0, nil, false) + writer2, _ = ioutil.NewBlockWriterNew(service, name, 0, nil, false) writer2.SetPrimaryKeyWithType(2, 2, index.PrefixFn{ Id: 123, Fn: func(in []byte) []byte { @@ -332,7 +333,7 @@ func TestWriter_WriteBlockAndBF(t *testing.T) { metaloc = objectio.BuildLocation(writer2.GetName(), blocks[0].GetExtent(), 100, blocks[0].GetID()) require.NoError(t, err) - reader, err = NewObjectReader("", service, metaloc) + reader, err = ioutil.NewObjectReader(service, metaloc) require.NoError(t, err) bf, _, err = reader.LoadOneBF(context.Background(), 0) assert.Nil(t, err) @@ -347,7 +348,7 @@ func TestConstructTombstoneWriter(t *testing.T) { mp := mpool.MustNewZero() fs := testutil.NewSharedFS() - writer := ConstructTombstoneWriter(objectio.HiddenColumnSelection_None, fs) + writer := ioutil.ConstructTombstoneWriter(objectio.HiddenColumnSelection_None, fs) assert.NotNil(t, writer) bat := batch.NewWithSize(2) diff --git a/pkg/vm/engine/tae/db/checkpoint/entry.go b/pkg/vm/engine/tae/db/checkpoint/entry.go index 84f2674e70bcc..91198365cac35 100644 --- a/pkg/vm/engine/tae/db/checkpoint/entry.go +++ b/pkg/vm/engine/tae/db/checkpoint/entry.go @@ -23,6 +23,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" @@ -411,7 +412,7 @@ func (e *CheckpointEntry) Read( fs *objectio.ObjectFS, data *logtail.CheckpointData, ) (err error) { - reader, err := blockio.NewObjectReader(e.sid, fs.Service, e.tnLocation) + reader, err := ioutil.NewObjectReader(fs.Service, e.tnLocation) if err != nil { return } @@ -447,7 +448,7 @@ func (e *CheckpointEntry) ReadMetaIdx( fs *objectio.ObjectFS, data *logtail.CheckpointData, ) (err error) { - reader, err := blockio.NewObjectReader(e.sid, fs.Service, e.tnLocation) + reader, err := ioutil.NewObjectReader(fs.Service, e.tnLocation) if err != nil { return } @@ -457,7 +458,7 @@ func (e *CheckpointEntry) ReadMetaIdx( func (e *CheckpointEntry) GetTableByID( ctx context.Context, fs *objectio.ObjectFS, tid uint64, ) (ins, del, dataObject, tombstoneObject *api.Batch, err error) { - reader, err := blockio.NewObjectReader(e.sid, fs.Service, e.cnLocation) + reader, err := ioutil.NewObjectReader(fs.Service, e.cnLocation) if err != nil { return } diff --git a/pkg/vm/engine/tae/db/checkpoint/reader.go b/pkg/vm/engine/tae/db/checkpoint/reader.go index db2396ef40797..c6d6f7acb1934 100644 --- a/pkg/vm/engine/tae/db/checkpoint/reader.go +++ b/pkg/vm/engine/tae/db/checkpoint/reader.go @@ -25,7 +25,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio/ckputil" "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "go.uber.org/zap" ) @@ -126,9 +125,8 @@ func (r *CKPMetaReader) Next( fname := ioutil.MakeFullName(r.dir, name) - var reader *blockio.BlockReader - if reader, err = blockio.NewFileReader( - r.sid, + var reader *ioutil.BlockReader + if reader, err = ioutil.NewFileReader( r.fs, fname, ); err != nil { diff --git a/pkg/vm/engine/tae/db/checkpoint/replay.go b/pkg/vm/engine/tae/db/checkpoint/replay.go index 3e0cc2de3305f..b47e79aec44d7 100644 --- a/pkg/vm/engine/tae/db/checkpoint/replay.go +++ b/pkg/vm/engine/tae/db/checkpoint/replay.go @@ -618,7 +618,7 @@ func MergeCkpMeta( maxFile := metaFiles[len(metaFiles)-1] - reader, err := blockio.NewFileReader(sid, fs, maxFile.GetCKPFullName()) + reader, err := ioutil.NewFileReader(fs, maxFile.GetCKPFullName()) if err != nil { return "", err } diff --git a/pkg/vm/engine/tae/db/checkpoint/snapshot.go b/pkg/vm/engine/tae/db/checkpoint/snapshot.go index d45841dbd5ac4..80784244181d3 100644 --- a/pkg/vm/engine/tae/db/checkpoint/snapshot.go +++ b/pkg/vm/engine/tae/db/checkpoint/snapshot.go @@ -25,7 +25,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/fileservice" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" ) @@ -161,11 +160,10 @@ func loadCheckpointMeta( tmpBat *batch.Batch ) loader := func(meta *ioutil.TSRangeFile) (err error) { - var reader *blockio.BlockReader + var reader *ioutil.BlockReader var bats []*batch.Batch var closeCB func() - reader, err = blockio.NewFileReader(sid, fs, meta.GetCKPFullName()) - if err != nil { + if reader, err = ioutil.NewFileReader(fs, meta.GetCKPFullName()); err != nil { return err } bats, closeCB, err = reader.LoadAllColumns(ctx, nil, common.DebugAllocator) diff --git a/pkg/vm/engine/tae/db/gc/v3/executor.go b/pkg/vm/engine/tae/db/gc/v3/executor.go index 73199da1f7865..1c398ba0508c8 100644 --- a/pkg/vm/engine/tae/db/gc/v3/executor.go +++ b/pkg/vm/engine/tae/db/gc/v3/executor.go @@ -23,11 +23,12 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" ) type GCJob = CheckpointBasedGCJob @@ -134,7 +135,7 @@ func (exec *GCExecutor) doFilter( // bit 0 means the row cannot be GC'ed exec.bm.Clear() exec.bm.TryExpandWithSize(bat.RowCount()) - err = mergesort.SortColumnsByIndex(bat.Vecs, 2, exec.mp) + err = mergeutil.SortColumnsByIndex(bat.Vecs, 2, exec.mp) if err != nil { return err } @@ -167,12 +168,12 @@ func (exec *GCExecutor) Run( finalCanGCSinker SinkerFn, ) (newFiles []objectio.ObjectStats, err error) { cannotGCSinker := exec.getSinker( - engine_util.WithBuffer(exec.buffer.impl, false), + ioutil.WithBuffer(exec.buffer.impl, false), ) canGCSinker := exec.getSinker( - engine_util.WithBuffer(exec.buffer.impl, false), - engine_util.WithTailSizeCap(exec.config.canGCCacheSize), + ioutil.WithBuffer(exec.buffer.impl, false), + ioutil.WithTailSizeCap(exec.config.canGCCacheSize), ) defer cannotGCSinker.Close() defer canGCSinker.Close() @@ -243,9 +244,9 @@ func (exec *GCExecutor) putBuffer(bat *batch.Batch) { } func (exec *GCExecutor) getSinker( - opts ...engine_util.SinkerOption, -) *engine_util.Sinker { - return engine_util.NewSinker( + opts ...ioutil.SinkerOption, +) *ioutil.Sinker { + return ioutil.NewSinker( ObjectTablePrimaryKeyIdx, ObjectTableAttrs, ObjectTableTypes, diff --git a/pkg/vm/engine/tae/db/gc/v3/merge.go b/pkg/vm/engine/tae/db/gc/v3/merge.go index 08c603f555e8c..883011713ebc3 100644 --- a/pkg/vm/engine/tae/db/gc/v3/merge.go +++ b/pkg/vm/engine/tae/db/gc/v3/merge.go @@ -21,7 +21,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -146,13 +146,13 @@ func MergeCheckpoint( tidColIdx := 4 objectBatch := containers.ToCNBatch(ckpData.GetObjectBatchs()) - err = mergesort.SortColumnsByIndex(objectBatch.Vecs, tidColIdx, pool) + err = mergeutil.SortColumnsByIndex(objectBatch.Vecs, tidColIdx, pool) if err != nil { return } tombstoneBatch := containers.ToCNBatch(ckpData.GetTombstoneObjectBatchs()) - err = mergesort.SortColumnsByIndex(tombstoneBatch.Vecs, tidColIdx, pool) + err = mergeutil.SortColumnsByIndex(tombstoneBatch.Vecs, tidColIdx, pool) if err != nil { return } diff --git a/pkg/vm/engine/tae/db/gc/v3/types.go b/pkg/vm/engine/tae/db/gc/v3/types.go index ce69177eafe9b..2b1cc9d973482 100644 --- a/pkg/vm/engine/tae/db/gc/v3/types.go +++ b/pkg/vm/engine/tae/db/gc/v3/types.go @@ -22,7 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" @@ -161,7 +161,7 @@ var ObjectTableSeqnums []uint16 var ObjectTableMetaAttrs []string var ObjectTableMetaTypes []types.Type -var FSinkerFactory engine_util.FileSinkerFactory +var FSinkerFactory ioutil.FileSinkerFactory const ObjectTablePrimaryKeyIdx = 0 const ObjectTableVersion = 0 @@ -194,7 +194,7 @@ func init() { objectio.VarcharType, } - FSinkerFactory = engine_util.NewFSinkerImplFactory( + FSinkerFactory = ioutil.NewFSinkerImplFactory( ObjectTableSeqnums, ObjectTablePrimaryKeyIdx, true, diff --git a/pkg/vm/engine/tae/db/gc/v3/window.go b/pkg/vm/engine/tae/db/gc/v3/window.go index b424ecfaa70a0..c49b2f033bd94 100644 --- a/pkg/vm/engine/tae/db/gc/v3/window.go +++ b/pkg/vm/engine/tae/db/gc/v3/window.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" @@ -35,7 +36,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -254,16 +254,16 @@ func (w *GCWindow) ScanCheckpoints( func (w *GCWindow) getSinker( tailSize int, buffer *containers.OneSchemaBatchBuffer, -) *engine_util.Sinker { - return engine_util.NewSinker( +) *ioutil.Sinker { + return ioutil.NewSinker( ObjectTablePrimaryKeyIdx, ObjectTableAttrs, ObjectTableTypes, FSinkerFactory, w.mp, w.fs, - engine_util.WithTailSizeCap(tailSize), - engine_util.WithBuffer(buffer, false), + ioutil.WithTailSizeCap(tailSize), + ioutil.WithBuffer(buffer, false), ) } @@ -394,7 +394,7 @@ func (w *GCWindow) sortOneBatch( data *batch.Batch, mp *mpool.MPool, ) error { - if err := mergesort.SortColumnsByIndex( + if err := mergeutil.SortColumnsByIndex( data.Vecs, ObjectTablePrimaryKeyIdx, mp, @@ -464,7 +464,7 @@ func (w *GCWindow) rebuildTable(bat *batch.Batch) { func (w *GCWindow) replayData( ctx context.Context, bs []objectio.BlockObject, - reader *blockio.BlockReader) (*batch.Batch, func(), error) { + reader *ioutil.BlockReader) (*batch.Batch, func(), error) { idxes := []uint16{0} bat, release, err := reader.LoadColumns(ctx, idxes, nil, bs[0].GetID(), w.mp) if err != nil { @@ -491,7 +491,7 @@ func (w *GCWindow) ReadTable(ctx context.Context, name string, fs fileservice.Fi meta := ioutil.DecodeGCMetadataName(name) w.tsRange.start = *meta.GetStart() w.tsRange.end = *meta.GetEnd() - reader, err := blockio.NewFileReaderNoCache(fs, name) + reader, err := ioutil.NewFileReaderNoCache(fs, name) if err != nil { return err } diff --git a/pkg/vm/engine/tae/db/merge/policy_test.go b/pkg/vm/engine/tae/db/merge/policy_test.go index 5b2eeeab29528..c84e1da66bdb5 100644 --- a/pkg/vm/engine/tae/db/merge/policy_test.go +++ b/pkg/vm/engine/tae/db/merge/policy_test.go @@ -26,9 +26,9 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/testutil" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" @@ -367,7 +367,7 @@ func TestCheckTombstone(t *testing.T) { rowids := make([]types.Rowid, rowCnt) metas := make([]objectio.ObjectDataMeta, ssCnt) for i := 0; i < ssCnt; i++ { - writer := blockio.ConstructTombstoneWriter(objectio.HiddenColumnSelection_None, fs) + writer := ioutil.ConstructTombstoneWriter(objectio.HiddenColumnSelection_None, fs) assert.NotNil(t, writer) bat := batch.NewWithSize(2) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index b64496315ab3a..12c3e4dc4ff73 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -577,7 +577,7 @@ func TestNonAppendableBlock(t *testing.T) { assert.Nil(t, err) dataBlk := obj.GetMeta().(*catalog.ObjectEntry).GetObjectData() name := objectio.BuildObjectNameWithObjectID(obj.GetID()) - writer, err := blockio.NewBlockWriterNew(dataBlk.GetFs().Service, name, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(dataBlk.GetFs().Service, name, 0, nil, false) assert.Nil(t, err) _, err = writer.WriteBatch(containers.ToCNBatch(bat)) assert.Nil(t, err) @@ -5287,7 +5287,7 @@ func TestMergeMemsize(t *testing.T) { bats := wholebat.Split(batCnt) // write only one block by apply metaloc objName1 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err := blockio.NewBlockWriterNew(tae.Runtime.Fs.Service, objName1, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(tae.Runtime.Fs.Service, objName1, 0, nil, false) assert.Nil(t, err) writer.SetPrimaryKey(3) for _, b := range bats { @@ -5359,7 +5359,7 @@ func TestCollectDeletesAfterCKP(t *testing.T) { bat := catalog.MockBatch(schema, 400) // write only one block by apply metaloc objName1 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err := blockio.NewBlockWriterNew(tae.Runtime.Fs.Service, objName1, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(tae.Runtime.Fs.Service, objName1, 0, nil, false) assert.Nil(t, err) writer.SetPrimaryKey(3) _, err = writer.WriteBatch(containers.ToCNBatch(bat)) @@ -5464,7 +5464,7 @@ func TestAlwaysUpdate(t *testing.T) { // write only one Object for i := 0; i < 1; i++ { objName1 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err := blockio.NewBlockWriterNew(tae.Runtime.Fs.Service, objName1, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(tae.Runtime.Fs.Service, objName1, 0, nil, false) assert.Nil(t, err) writer.SetPrimaryKey(3) for _, bat := range bats[i*25 : (i+1)*25] { @@ -8582,7 +8582,7 @@ func TestCommitS3Blocks(t *testing.T) { statsVecs := make([]containers.Vector, 0) for _, bat := range datas { name := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err := blockio.NewBlockWriterNew(tae.Runtime.Fs.Service, name, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(tae.Runtime.Fs.Service, name, 0, nil, false) assert.Nil(t, err) writer.SetPrimaryKey(3) for i := 0; i < 50; i++ { @@ -8660,7 +8660,7 @@ func TestDedupSnapshot2(t *testing.T) { testutil.CreateRelation(t, tae.DB, "db", schema, true) name := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err := blockio.NewBlockWriterNew(tae.Runtime.Fs.Service, name, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(tae.Runtime.Fs.Service, name, 0, nil, false) assert.Nil(t, err) writer.SetPrimaryKey(3) _, err = writer.WriteBatch(containers.ToCNBatch(data)) @@ -8674,7 +8674,7 @@ func TestDedupSnapshot2(t *testing.T) { statsVec.Append(ss[:], false) name2 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err = blockio.NewBlockWriterNew(tae.Runtime.Fs.Service, name2, 0, nil, false) + writer, err = ioutil.NewBlockWriterNew(tae.Runtime.Fs.Service, name2, 0, nil, false) assert.Nil(t, err) writer.SetPrimaryKey(3) _, err = writer.WriteBatch(containers.ToCNBatch(data)) @@ -8835,7 +8835,7 @@ func TestDeduplication(t *testing.T) { }) blk1Name := objectio.BuildObjectNameWithObjectID(ObjectIDs[1]) - writer, err := blockio.NewBlockWriterNew(tae.Runtime.Fs.Service, blk1Name, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(tae.Runtime.Fs.Service, blk1Name, 0, nil, false) assert.NoError(t, err) writer.SetPrimaryKey(3) writer.WriteBatch(containers.ToCNBatch(bats[0])) diff --git a/pkg/vm/engine/tae/db/testutil/engine.go b/pkg/vm/engine/tae/db/testutil/engine.go index 0d1f41e63b419..4cdf9ecc77542 100644 --- a/pkg/vm/engine/tae/db/testutil/engine.go +++ b/pkg/vm/engine/tae/db/testutil/engine.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/cmd_util" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -410,7 +411,7 @@ func writeIncrementalCheckpoint( } func tnReadCheckpoint(t *testing.T, location objectio.Location, fs fileservice.FileService) *logtail.CheckpointData { - reader, err := blockio.NewObjectReader("", fs, location) + reader, err := ioutil.NewObjectReader(fs, location) assert.NoError(t, err) data := logtail.NewCheckpointData("", common.CheckpointAllocator) err = data.ReadFrom( diff --git a/pkg/vm/engine/tae/db/testutil/funcs.go b/pkg/vm/engine/tae/db/testutil/funcs.go index c41c99c797ba8..f17ab4dce1f01 100644 --- a/pkg/vm/engine/tae/db/testutil/funcs.go +++ b/pkg/vm/engine/tae/db/testutil/funcs.go @@ -23,7 +23,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" @@ -584,7 +584,7 @@ func MockCNDeleteInS3( bat.AddVector(objectio.TombstoneAttr_Rowid_Attr, rowIDVec) bat.AddVector("pk", pkVec) name := objectio.MockObjectName() - writer, err := blockio.NewBlockWriterNew(fs.Service, name, 0, nil, true) + writer, err := ioutil.NewBlockWriterNew(fs.Service, name, 0, nil, true) writer.SetPrimaryKeyWithType(uint16(objectio.TombstonePrimaryKeyIdx), index.HBF, index.ObjectPrefixFn, index.BlockPrefixFn) diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index baf43bdc68edd..1dc756a844c0b 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -21,6 +21,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine" @@ -301,7 +302,7 @@ func GetCheckpointData( } data := NewCheckpointData(sid, common.CheckpointAllocator) - reader, err := blockio.NewObjectReader(sid, fs, location) + reader, err := ioutil.NewObjectReader(fs, location) if err != nil { return nil, err } @@ -582,7 +583,7 @@ func ReWriteCheckpointAndBlockFromKey( insertBatchFun := func( objsData map[string]*objData, - initData func(*objData, *blockio.BlockWriter) (bool, error), + initData func(*objData, *ioutil.BlockWriter) (bool, error), ) error { for _, objectData := range objsData { if insertObjBatch[objectData.tid] == nil { @@ -596,8 +597,8 @@ func ReWriteCheckpointAndBlockFromKey( fileNum := uint16(1000) + objectName.Num() segment := objectName.SegmentId() name := objectio.BuildObjectName(&segment, fileNum) - var writer *blockio.BlockWriter - writer, err = blockio.NewBlockWriter(dstFs, name.String()) + var writer *ioutil.BlockWriter + writer, err = ioutil.NewBlockWriter(dstFs, name.String()) if err != nil { return err } @@ -650,7 +651,7 @@ func ReWriteCheckpointAndBlockFromKey( err = insertBatchFun( objectsData, - func(oData *objData, writer *blockio.BlockWriter) (bool, error) { + func(oData *objData, writer *ioutil.BlockWriter) (bool, error) { ds := NewBackupDeltaLocDataSource(ctx, fs, ts, tombstonesData) blk := oData.stats.ConstructBlockInfo(uint16(0)) bat, sortKey, err := blockio.BlockDataReadBackup(ctx, &blk, ds, nil, ts, fs) @@ -684,7 +685,7 @@ func ReWriteCheckpointAndBlockFromKey( err = insertBatchFun( tombstonesData, - func(oData *objData, writer *blockio.BlockWriter) (bool, error) { + func(oData *objData, writer *ioutil.BlockWriter) (bool, error) { if oData.data[0].Vecs[0].Length() == 0 { logutil.Info("[Data Empty] ReWrite Checkpoint", zap.String("tombstone", oData.stats.ObjectName().String()), diff --git a/pkg/vm/engine/tae/logtail/handle.go b/pkg/vm/engine/tae/logtail/handle.go index da5fdb8817d4f..71020f66f2eab 100644 --- a/pkg/vm/engine/tae/logtail/handle.go +++ b/pkg/vm/engine/tae/logtail/handle.go @@ -82,6 +82,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/util/fault" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" @@ -442,7 +443,7 @@ func LoadCheckpointEntries( datas := make([]*CNCheckpointData, len(locationsAndVersions)/2) - readers := make([]*blockio.BlockReader, len(locationsAndVersions)/2) + readers := make([]*ioutil.BlockReader, len(locationsAndVersions)/2) objectLocations := make([]objectio.Location, len(locationsAndVersions)/2) versions := make([]uint32, len(locationsAndVersions)/2) locations := make([]objectio.Location, len(locationsAndVersions)/2) @@ -463,7 +464,7 @@ func LoadCheckpointEntries( return nil, nil, err } locations[i/2] = location - reader, err := blockio.NewObjectReader(sid, fs, location) + reader, err := ioutil.NewObjectReader(fs, location) if err != nil { return nil, nil, err } diff --git a/pkg/vm/engine/tae/logtail/reader.go b/pkg/vm/engine/tae/logtail/reader.go index 1c11695f7fa58..b924dc3481645 100644 --- a/pkg/vm/engine/tae/logtail/reader.go +++ b/pkg/vm/engine/tae/logtail/reader.go @@ -23,8 +23,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/plan" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" @@ -145,7 +145,7 @@ func MakeGlobalCheckpointDataReader( fs: fs, version: version, } - reader, err := blockio.NewObjectReader(sid, fs, location) + reader, err := ioutil.NewObjectReader(fs, location) if err != nil { return nil, err } @@ -184,8 +184,7 @@ func (r *CheckpointReader) LoadBatchData( return true, nil } key := r.locations[0] - var reader *blockio.BlockReader - reader, err := blockio.NewObjectReader(r.sid, r.fs, key) + reader, err := ioutil.NewObjectReader(r.fs, key) if err != nil { return false, err } diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index 596a6adb6aada..c24f292ec8e23 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -23,6 +23,7 @@ import ( "time" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "go.uber.org/zap" @@ -1281,7 +1282,7 @@ func (sm *SnapshotMeta) ReadMeta(ctx context.Context, name string, fs fileservic default: } - reader, err := blockio.NewFileReaderNoCache(fs, name) + reader, err := ioutil.NewFileReaderNoCache(fs, name) if err != nil { return err } @@ -1342,7 +1343,7 @@ func (sm *SnapshotMeta) ReadMeta(ctx context.Context, name string, fs fileservic } func (sm *SnapshotMeta) ReadTableInfo(ctx context.Context, name string, fs fileservice.FileService) error { - reader, err := blockio.NewFileReaderNoCache(fs, name) + reader, err := ioutil.NewFileReaderNoCache(fs, name) if err != nil { return err } diff --git a/pkg/vm/engine/tae/logtail/storage_usage.go b/pkg/vm/engine/tae/logtail/storage_usage.go index 224232daa83f3..000ea82908cf8 100644 --- a/pkg/vm/engine/tae/logtail/storage_usage.go +++ b/pkg/vm/engine/tae/logtail/storage_usage.go @@ -37,8 +37,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/tidwall/btree" @@ -1405,7 +1405,7 @@ func loadMetaBat( data.PrefetchMetaIdx(ctx, versions[idx], idxes, locations[idx], fs) // 1.2. read meta bat - reader, err := blockio.NewObjectReader(sid, fs, locations[idx]) + reader, err := ioutil.NewObjectReader(fs, locations[idx]) if err != nil { return nil, nil, nil, err } @@ -1437,7 +1437,7 @@ func loadStorageUsageBatch( for it.HasNext() { block := it.Next() schema := checkpointDataReferVersions[version][uint32(batIdx)] - reader, err := blockio.NewObjectReader(sid, fs, block.GetLocation()) + reader, err := ioutil.NewObjectReader(fs, block.GetLocation()) if err != nil { return nil, err } diff --git a/pkg/vm/engine/tae/logtail/utils.go b/pkg/vm/engine/tae/logtail/utils.go index 424a311099d47..4841022ed82f3 100644 --- a/pkg/vm/engine/tae/logtail/utils.go +++ b/pkg/vm/engine/tae/logtail/utils.go @@ -28,6 +28,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" @@ -534,7 +535,7 @@ func switchCheckpointIdx(i uint16) uint16 { } func (data *CNCheckpointData) InitMetaIdx( - ctx context.Context, version uint32, reader *blockio.BlockReader, + ctx context.Context, version uint32, reader *ioutil.BlockReader, location objectio.Location, m *mpool.MPool, ) error { if data.bats[MetaIDX] == nil { @@ -677,7 +678,7 @@ func (data *CNCheckpointData) ReadFromData( ctx context.Context, tableID uint64, location objectio.Location, - reader *blockio.BlockReader, + reader *ioutil.BlockReader, version uint32, m *mpool.MPool, ) (dataBats []*batch.Batch, err error) { @@ -697,7 +698,7 @@ func (data *CNCheckpointData) ReadFromData( block := it.Next() var bat, windowBat *batch.Batch schema := checkpointDataReferVersions[version][uint32(idx)] - reader, err = blockio.NewObjectReader(data.sid, reader.GetObjectReader().GetObject().GetFs(), block.GetLocation()) + reader, err = ioutil.NewObjectReader(reader.GetObjectReader().GetObject().GetFs(), block.GetLocation()) if err != nil { return } @@ -1008,7 +1009,7 @@ func (data *CheckpointData) WriteTo( segmentid := objectio.NewSegmentid() fileNum := uint16(0) name := objectio.BuildObjectName(segmentid, fileNum) - writer, err := blockio.NewBlockWriterNew(fs, name, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(fs, name, 0, nil, false) if err != nil { return } @@ -1031,13 +1032,13 @@ func (data *CheckpointData) WriteTo( var blks []objectio.BlockObject if objectSize > checkpointSize { fileNum++ - blks, _, err = writer.Sync(context.Background()) + blks, _, err = writer.Sync(ctx) if err != nil { return } checkpointFiles = append(checkpointFiles, name.String()) name = objectio.BuildObjectName(segmentid, fileNum) - writer, err = blockio.NewBlockWriterNew(fs, name, 0, nil, false) + writer, err = ioutil.NewBlockWriterNew(fs, name, 0, nil, false) if err != nil { return } @@ -1081,7 +1082,7 @@ func (data *CheckpointData) WriteTo( } } } - blks, _, err := writer.Sync(context.Background()) + blks, _, err := writer.Sync(ctx) if err != nil { return } @@ -1151,27 +1152,27 @@ func (data *CheckpointData) WriteTo( segmentid2 := objectio.NewSegmentid() name2 := objectio.BuildObjectName(segmentid2, 0) - writer2, err := blockio.NewBlockWriterNew(fs, name2, 0, nil, false) + writer2, err := ioutil.NewBlockWriterNew(fs, name2, 0, nil, false) if err != nil { return } if _, _, err = writer2.WriteSubBatch( containers.ToCNBatch(data.bats[MetaIDX]), - objectio.ConvertToSchemaType(uint16(MetaIDX))); err != nil { - return - } - if err != nil { + objectio.ConvertToSchemaType(uint16(MetaIDX)), + ); err != nil { return } if _, _, err = writer2.WriteSubBatch( containers.ToCNBatch(data.bats[TNMetaIDX]), - objectio.ConvertToSchemaType(uint16(TNMetaIDX))); err != nil { + objectio.ConvertToSchemaType(uint16(TNMetaIDX)), + ); err != nil { return } - if err != nil { + + var blks2 []objectio.BlockObject + if blks2, _, err = writer2.Sync(ctx); err != nil { return } - blks2, _, err := writer2.Sync(context.Background()) CNLocation = objectio.BuildLocation(name2, blks2[0].GetExtent(), 0, blks2[0].GetID()) TNLocation = objectio.BuildLocation(name2, blks2[1].GetExtent(), 0, blks2[1].GetID()) return @@ -1183,7 +1184,7 @@ func LoadBlkColumnsByMeta( colTypes []types.Type, colNames []string, id uint16, - reader *blockio.BlockReader, + reader *ioutil.BlockReader, mp *mpool.MPool, ) ([]*containers.Batch, error) { idxs := make([]uint16, len(colNames)) @@ -1230,7 +1231,7 @@ func LoadCNSubBlkColumnsByMeta( colTypes []types.Type, colNames []string, id uint16, - reader *blockio.BlockReader, + reader *ioutil.BlockReader, m *mpool.MPool, ) ([]*batch.Batch, error) { idxs := make([]uint16, len(colNames)) @@ -1270,7 +1271,7 @@ func LoadCNSubBlkColumnsByMetaWithId( dataType uint16, id uint16, version uint32, - reader *blockio.BlockReader, + reader *ioutil.BlockReader, m *mpool.MPool, ) (bat *batch.Batch, err error) { idxs := make([]uint16, len(colNames)) @@ -1296,7 +1297,7 @@ func (data *CheckpointData) ReadTNMetaBatch( ctx context.Context, version uint32, location objectio.Location, - reader *blockio.BlockReader, + reader *ioutil.BlockReader, ) (err error) { if data.bats[TNMetaIDX].Length() == 0 { var bats []*containers.Batch @@ -1357,7 +1358,7 @@ func (data *CheckpointData) ReadFrom( ctx context.Context, version uint32, location objectio.Location, - reader *blockio.BlockReader, + reader *ioutil.BlockReader, fs fileservice.FileService, ) (err error) { err = data.readMetaBatch(ctx, version, reader, data.allocator) @@ -1388,8 +1389,8 @@ func LoadCheckpointLocations( data := NewCheckpointData(sid, common.CheckpointAllocator) defer data.Close() - var reader *blockio.BlockReader - if reader, err = blockio.NewObjectReader(sid, fs, location); err != nil { + var reader *ioutil.BlockReader + if reader, err = ioutil.NewObjectReader(fs, location); err != nil { return nil, err } @@ -1422,8 +1423,8 @@ func LoadSpecifiedCkpBatch( err = moerr.NewInvalidArgNoCtx("out of bound batchIdx", batchIdx) return } - var reader *blockio.BlockReader - if reader, err = blockio.NewObjectReader(sid, fs, location); err != nil { + var reader *ioutil.BlockReader + if reader, err = ioutil.NewObjectReader(fs, location); err != nil { return } @@ -1433,7 +1434,7 @@ func LoadSpecifiedCkpBatch( data.replayMetaBatch(version) for _, val := range data.locations { - if reader, err = blockio.NewObjectReader(sid, fs, val); err != nil { + if reader, err = ioutil.NewObjectReader(fs, val); err != nil { return } var bats []*containers.Batch @@ -1458,7 +1459,7 @@ func LoadSpecifiedCkpBatch( func (data *CheckpointData) readMetaBatch( ctx context.Context, version uint32, - reader *blockio.BlockReader, + reader *ioutil.BlockReader, _ *mpool.MPool, ) (err error) { if data.bats[MetaIDX].Length() == 0 { @@ -1543,8 +1544,8 @@ func (data *CheckpointData) readAll( checkpointDataSize := uint64(0) readDuration := time.Now() for _, val := range data.locations { - var reader *blockio.BlockReader - reader, err = blockio.NewObjectReader(data.sid, service, val) + var reader *ioutil.BlockReader + reader, err = ioutil.NewObjectReader(service, val) if err != nil { return } diff --git a/pkg/vm/engine/tae/mergesort/func.go b/pkg/vm/engine/tae/mergesort/func.go index 219e7b4bdfde7..417d7b0d27e45 100644 --- a/pkg/vm/engine/tae/mergesort/func.go +++ b/pkg/vm/engine/tae/mergesort/func.go @@ -15,33 +15,13 @@ package mergesort import ( - "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/sort" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" ) /// merge things -func SortColumnsByIndex( - cols []*vector.Vector, sortIdx int, mp *mpool.MPool, -) (err error) { - sortKey := cols[sortIdx] - sortedIdx := make([]int64, sortKey.Length()) - for i := 0; i < len(sortedIdx); i++ { - sortedIdx[i] = int64(i) - } - sort.Sort(false, false, true, sortedIdx, sortKey) - for i := 0; i < len(cols); i++ { - err = cols[i].Shuffle(sortedIdx, mp) - if err != nil { - return - } - } - return -} - func SortBlockColumns( cols []containers.Vector, pk int, pool *containers.VectorPool, ) ([]int64, error) { diff --git a/pkg/vm/engine/tae/mergesort/merger.go b/pkg/vm/engine/tae/mergesort/merger.go index e2825b77ab126..2b96b0486ec43 100644 --- a/pkg/vm/engine/tae/mergesort/merger.go +++ b/pkg/vm/engine/tae/mergesort/merger.go @@ -23,9 +23,9 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/sort" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" ) type Merger interface { @@ -96,7 +96,7 @@ type merger[T comparable] struct { host MergeTaskHost - writer *blockio.BlockWriter + writer *ioutil.BlockWriter sortKeyIdx int diff --git a/pkg/vm/engine/tae/mergesort/reshaper.go b/pkg/vm/engine/tae/mergesort/reshaper.go index dbebca1d5be59..53904855c820d 100644 --- a/pkg/vm/engine/tae/mergesort/reshaper.go +++ b/pkg/vm/engine/tae/mergesort/reshaper.go @@ -19,8 +19,8 @@ import ( "errors" "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" ) func reshape(ctx context.Context, host MergeTaskHost) error { @@ -44,7 +44,7 @@ func reshape(ctx context.Context, host MergeTaskHost) error { accObjBlkCnts := host.GetAccBlkCnts() transferMaps := host.GetTransferMaps() - var writer *blockio.BlockWriter + var writer *ioutil.BlockWriter var buffer *batch.Batch var releaseF func() defer func() { @@ -141,7 +141,7 @@ func reshape(ctx context.Context, host MergeTaskHost) error { return nil } -func syncObject(ctx context.Context, writer *blockio.BlockWriter, commitEntry *api.MergeCommitEntry) error { +func syncObject(ctx context.Context, writer *ioutil.BlockWriter, commitEntry *api.MergeCommitEntry) error { if _, _, err := writer.Sync(ctx); err != nil { return err } diff --git a/pkg/vm/engine/tae/mergesort/task.go b/pkg/vm/engine/tae/mergesort/task.go index 227bee6742df3..895e9b9716c67 100644 --- a/pkg/vm/engine/tae/mergesort/task.go +++ b/pkg/vm/engine/tae/mergesort/task.go @@ -29,9 +29,9 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "go.uber.org/zap" ) @@ -51,7 +51,7 @@ type MergeTaskHost interface { GetCommitEntry() *api.MergeCommitEntry InitTransferMaps(blkCnt int) GetTransferMaps() api.TransferMaps - PrepareNewWriter() *blockio.BlockWriter + PrepareNewWriter() *ioutil.BlockWriter DoTransfer() bool GetObjectCnt() int GetBlkCnts() []int diff --git a/pkg/vm/engine/tae/rpc/handle_debug.go b/pkg/vm/engine/tae/rpc/handle_debug.go index bbc470a46b994..c64cff406d918 100644 --- a/pkg/vm/engine/tae/rpc/handle_debug.go +++ b/pkg/vm/engine/tae/rpc/handle_debug.go @@ -32,13 +32,13 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/pb/txn" "github.com/matrixorigin/matrixone/pkg/util/fault" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine/cmd_util" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" @@ -763,7 +763,7 @@ func marshalTransferMaps( req.BookingLoc = req.BookingLoc[blkCnt+1:] locations := req.BookingLoc for _, filepath := range locations { - reader, err := blockio.NewFileReader(sid, fs, filepath) + reader, err := ioutil.NewFileReader(fs, filepath) if err != nil { return nil, err } diff --git a/pkg/vm/engine/tae/rpc/inspect.go b/pkg/vm/engine/tae/rpc/inspect.go index bb612c58a6c15..aadf7f98fd0cc 100644 --- a/pkg/vm/engine/tae/rpc/inspect.go +++ b/pkg/vm/engine/tae/rpc/inspect.go @@ -104,6 +104,9 @@ func initCommand(_ context.Context, inspectCtx *inspectContext) *cobra.Command { inspect := &MoInspectArg{} rootCmd.AddCommand(inspect.PrepareCommand()) + gc := &GCArg{} + rootCmd.AddCommand(gc.PrepareCommand()) + return rootCmd } diff --git a/pkg/vm/engine/tae/rpc/rpc_test.go b/pkg/vm/engine/tae/rpc/rpc_test.go index c902a5a29bb47..60c68ab85bc55 100644 --- a/pkg/vm/engine/tae/rpc/rpc_test.go +++ b/pkg/vm/engine/tae/rpc/rpc_test.go @@ -27,11 +27,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/txn" "github.com/matrixorigin/matrixone/pkg/vm/engine" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" @@ -73,7 +73,7 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) { for i := 0; i < 100; i++ { name := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) objNames = append(objNames, name) - writer, err := blockio.NewBlockWriterNew(fs, objNames[i], 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(fs, objNames[i], 0, nil, false) assert.Nil(t, err) for j := 0; j < 50; j++ { _, err = writer.WriteBatch(containers.ToCNBatch(taeBats[offset+j])) @@ -221,7 +221,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) { //write taeBats[0], taeBats[1] two blocks into file service objName1 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err := blockio.NewBlockWriterNew(fs, objName1, 0, nil, false) + writer, err := ioutil.NewBlockWriterNew(fs, objName1, 0, nil, false) assert.Nil(t, err) writer.SetPrimaryKey(1) for i, bat := range taeBats { @@ -240,7 +240,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) { //write taeBats[3] into file service objName2 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err = blockio.NewBlockWriterNew(fs, objName2, 0, nil, false) + writer, err = ioutil.NewBlockWriterNew(fs, objName2, 0, nil, false) assert.Nil(t, err) writer.SetPrimaryKey(1) _, err = writer.WriteBatch(containers.ToCNBatch(taeBats[3])) @@ -430,7 +430,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) { //write deleted row ids into FS objName3 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) - writer, err = blockio.NewBlockWriterNew(fs, objName3, 0, nil, true) + writer, err = ioutil.NewBlockWriterNew(fs, objName3, 0, nil, true) assert.Nil(t, err) writer.SetPrimaryKeyWithType(uint16(objectio.TombstonePrimaryKeyIdx), index.HBF, index.ObjectPrefixFn, diff --git a/pkg/vm/engine/tae/rpc/tool.go b/pkg/vm/engine/tae/rpc/tool.go index 193028e35130e..822ae91825965 100644 --- a/pkg/vm/engine/tae/rpc/tool.go +++ b/pkg/vm/engine/tae/rpc/tool.go @@ -15,8 +15,10 @@ package rpc import ( + "bufio" "context" "fmt" + "os" "path/filepath" "strconv" "strings" @@ -32,6 +34,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" ) @@ -142,6 +145,9 @@ func (c *MoInspectArg) PrepareCommand() *cobra.Command { ckp := CheckpointArg{} moInspectCmd.AddCommand(ckp.PrepareCommand()) + gc := gcRemoveArg{} + moInspectCmd.AddCommand(gc.PrepareCommand()) + return moInspectCmd } @@ -1279,3 +1285,350 @@ func (c *ckpListArg) getTableList(ctx context.Context) (res string, err error) { return } + +type GCArg struct { +} + +func (c *GCArg) PrepareCommand() *cobra.Command { + gcCmd := &cobra.Command{ + Use: "gc", + Short: "gc", + Long: "Display information about a given gc", + Run: RunFactory(c), + } + + gcCmd.SetUsageTemplate(c.Usage()) + + dump := gcDumpArg{} + gcCmd.AddCommand(dump.PrepareCommand()) + + remove := gcRemoveArg{} + gcCmd.AddCommand(remove.PrepareCommand()) + + return gcCmd +} + +func (c *GCArg) FromCommand(cmd *cobra.Command) (err error) { + return nil +} + +func (c *GCArg) String() string { + return "gc" +} + +func (c *GCArg) Usage() (res string) { + res += "Available Commands:\n" + res += fmt.Sprintf(" %-5v show gc information\n", "stat") + + res += "\n" + res += "Usage:\n" + res += "inspect table [flags] [options]\n" + + res += "\n" + res += "Use \"mo-tool inspect table --help\" for more information about a given command.\n" + + return +} + +func (c *GCArg) Run() error { + return nil +} + +type gcDumpArg struct { + ctx *inspectContext + file string + res string +} + +func (c *gcDumpArg) PrepareCommand() *cobra.Command { + gcDumpCmd := &cobra.Command{ + Use: "dump", + Short: "gc dump", + Long: "Display information about a given gc", + Run: RunFactory(c), + } + + gcDumpCmd.SetUsageTemplate(c.Usage()) + + gcDumpCmd.Flags().StringP("file", "f", "", "file to dump") + + return gcDumpCmd +} + +func (c *gcDumpArg) FromCommand(cmd *cobra.Command) (err error) { + if cmd.Flag("ictx") != nil { + c.ctx = cmd.Flag("ictx").Value.(*inspectContext) + } + c.file, _ = cmd.Flags().GetString("file") + return nil +} + +func (c *gcDumpArg) String() string { + return c.res +} + +func (c *gcDumpArg) Usage() (res string) { + res += "Examples:\n" + res += " # Dump the pinned objects to the file\n" + res += " inspect gc dump -f /your/path/file" + return +} + +func (c *gcDumpArg) Run() (err error) { + if c.ctx == nil { + return moerr.NewInfoNoCtx("it is an online command") + } + ctx := context.Background() + now := time.Now().Unix() + + err = os.MkdirAll(filepath.Dir(c.file), 0755) + if err != nil { + err = moerr.NewInternalErrorNoCtx(fmt.Sprintf("Error creating directory: %v", err)) + return + } + file, err := os.OpenFile(c.file, os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_TRUNC, 0644) + if err != nil { + return + } + defer file.Close() + + pinnedObjects := make(map[string]bool) + + if err = c.getInMemObjects(pinnedObjects); err != nil { + return + } + if err = c.getCheckpointObject(ctx, pinnedObjects); err != nil { + return + } + + for obj := range pinnedObjects { + _, err = file.WriteString(obj + "\n") + if err != nil { + err = moerr.NewInternalErrorNoCtx(fmt.Sprintf("Error writing to file: %v", err)) + return + } + } + + c.res = fmt.Sprintf("Dumped pinned objects to file, file count %v, start tmie %v", len(pinnedObjects), now) + + return +} + +func (c *gcDumpArg) getInMemObjects(pinnedObjects map[string]bool) (err error) { + dbIt := c.ctx.db.Catalog.MakeDBIt(false) + for dbIt.Valid() { + db := dbIt.Get().GetPayload() + tableIt := db.MakeTableIt(false) + for tableIt.Valid() { + table := tableIt.Get().GetPayload() + lp := new(catalog.LoopProcessor) + lp.TombstoneFn = func(be *catalog.ObjectEntry) error { + pinnedObjects[be.ObjectName().String()] = true + return nil + } + lp.ObjectFn = func(be *catalog.ObjectEntry) error { + pinnedObjects[be.ObjectName().String()] = true + return nil + } + if err = table.RecurLoop(lp); err != nil { + return + } + tableIt.Next() + } + dbIt.Next() + } + return +} + +func (c *gcDumpArg) getCheckpointObject(ctx context.Context, pinned map[string]bool) (err error) { + entries := c.ctx.db.BGCheckpointRunner.GetAllCheckpoints() + for _, entry := range entries { + cnLoc := entry.GetLocation() + cnObj := cnLoc.Name().String() + pinned[cnObj] = true + + tnLoc := entry.GetTNLocation() + tnObj := tnLoc.Name().String() + pinned[tnObj] = true + + data, err := getCkpData(ctx, entry, c.ctx.db.Runtime.Fs) + if err != nil { + return moerr.NewInfoNoCtx(fmt.Sprintf("failed to get checkpoint data %v, %v", entry.LSN(), err)) + } + getObjectsFromCkpMeta(data, pinned) + getObjectsFromCkpData(data, pinned) + } + return +} + +func getObjectsFromCkpMeta(data *logtail.CheckpointData, pinned map[string]bool) { + bats := data.GetBatches() + + metaBat := bats[logtail.MetaIDX] + metaAttr := logtail.MetaSchemaAttr + for _, attr := range metaAttr { + if attr == logtail.SnapshotAttr_TID { + continue + } + vec := metaBat.GetVectorByName(attr) + for i := 0; i < vec.Length(); i++ { + v := vec.Get(i).([]byte) + if len(v) == 0 { + continue + } + loc := objectio.Location(v) + obj := loc.Name().String() + pinned[obj] = true + } + } + + tnBat := bats[logtail.TNMetaIDX] + vec := tnBat.GetVectorByName(logtail.CheckpointMetaAttr_BlockLocation) + for i := 0; i < vec.Length(); i++ { + v := vec.Get(i).([]byte) + if len(v) == 0 { + continue + } + loc := objectio.Location(v) + obj := loc.Name().String() + pinned[obj] = true + } +} + +func getObjectsFromCkpData(data *logtail.CheckpointData, pinned map[string]bool) { + bat := data.GetObjectBatchs() + vec := bat.GetVectorByName(logtail.ObjectAttr_ObjectStats) + for i := 0; i < vec.Length(); i++ { + v := vec.Get(i).([]byte) + obj := objectio.ObjectStats(v) + pinned[obj.ObjectName().String()] = true + } + + bat = data.GetTombstoneObjectBatchs() + vec = bat.GetVectorByName(logtail.ObjectAttr_ObjectStats) + for i := 0; i < vec.Length(); i++ { + v := vec.Get(i).([]byte) + obj := objectio.ObjectStats(v) + pinned[obj.ObjectName().String()] = true + } +} + +type gcRemoveArg struct { + file string + oriDir string + tarDir string + modTime int64 + dry bool + res string +} + +func (c *gcRemoveArg) PrepareCommand() *cobra.Command { + gcRemoveCmd := &cobra.Command{ + Use: "remove", + Short: "gc remove", + Long: "Remove objects from the given file", + Run: RunFactory(c), + } + + gcRemoveCmd.SetUsageTemplate(c.Usage()) + + gcRemoveCmd.Flags().StringP("file", "f", "", "file to remove") + gcRemoveCmd.Flags().StringP("ori", "o", "", "original directory") + gcRemoveCmd.Flags().StringP("tar", "t", "", "target directory") + gcRemoveCmd.Flags().Int64P("mod", "m", 0, "modified time") + gcRemoveCmd.Flags().BoolP("dry", "", false, "dry run") + + return gcRemoveCmd +} + +func (c *gcRemoveArg) FromCommand(cmd *cobra.Command) (err error) { + c.file, _ = cmd.Flags().GetString("file") + c.oriDir, _ = cmd.Flags().GetString("ori") + c.tarDir, _ = cmd.Flags().GetString("tar") + c.modTime, _ = cmd.Flags().GetInt64("mod") + c.dry, _ = cmd.Flags().GetBool("dry") + return nil +} + +func (c *gcRemoveArg) String() string { + return c.res +} + +func (c *gcRemoveArg) Usage() (res string) { + res += "Examples:\n" + res += " # Remove objects from the given file\n" + res += " inspect gc remove -f file -o ori -t tar\n" + res += " # Remove objects from the given file with modified time\n" + res += " inspect gc remove -f file -o ori -t tar -m 1620000000" + res += " # Dry run to remove objects from the given file\n" + res += " inspect gc remove -f file -o ori -t tar -d" + return +} + +func (c *gcRemoveArg) Run() (err error) { + if c.file == "" || c.oriDir == "" || c.tarDir == "" { + return moerr.NewInfoNoCtx("invalid inputs") + } + + file, err := os.Open(c.file) + if err != nil { + return moerr.NewInfoNoCtx(fmt.Sprintf("failed to open file %v, %v", c.file, err)) + } + defer file.Close() + + pinned := make(map[string]bool) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + obj := scanner.Text() + pinned[obj] = true + } + + files, err := os.ReadDir(c.oriDir) + if err != nil { + return moerr.NewInfoNoCtx(fmt.Sprintf("failed to read directory %v, %v", c.oriDir, err)) + } + + err = os.MkdirAll(c.tarDir, 0755) + if err != nil { + err = moerr.NewInternalErrorNoCtx(fmt.Sprintf("Error creating directory: %v", err)) + return + } + + toMove := make([]string, 0) + for _, obj := range files { + if obj.IsDir() || pinned[obj.Name()] { + continue + } + info, err := obj.Info() + if err != nil { + return moerr.NewInfoNoCtx(fmt.Sprintf("failed to get file info %v, %v", obj.Name(), err)) + } + modTime := info.ModTime() + if c.modTime != 0 { + if modTime.Unix() > c.modTime { + continue + } + } else if time.Since(modTime).Hours() < 5*24 { + continue + } + toMove = append(toMove, obj.Name()) + } + + if c.dry { + c.res = fmt.Sprintf("Dry run, to remove objects %v", len(toMove)) + return + } + + for _, obj := range toMove { + src := filepath.Join(c.oriDir, obj) + dst := filepath.Join(c.tarDir, obj) + err = os.Rename(src, dst) + if err != nil { + return moerr.NewInfoNoCtx(fmt.Sprintf("failed to move file %v to %v, %v", src, dst, err)) + } + } + + c.res = fmt.Sprintf("Moved objects from %v to %v, objects count %v", c.oriDir, c.tarDir, len(toMove)) + + return +} diff --git a/pkg/vm/engine/tae/rpc/tool_test.go b/pkg/vm/engine/tae/rpc/tool_test.go index ca6c731220c04..2704e6655b161 100644 --- a/pkg/vm/engine/tae/rpc/tool_test.go +++ b/pkg/vm/engine/tae/rpc/tool_test.go @@ -16,9 +16,18 @@ package rpc import ( "context" + "path" "testing" + "time" + + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/stretchr/testify/assert" + + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/testutil" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils/config" ) func Test_objGetArg(t *testing.T) { @@ -31,5 +40,83 @@ func Test_objGetArg(t *testing.T) { _, err = get.GetData(ctx) assert.Error(t, err) +} + +func Test_gcArg(t *testing.T) { + t.Skip("todo") + defer testutils.AfterTest(t)() + testutils.EnsureNoLeak(t) + + ctx := context.Background() + opts := config.WithQuickScanAndCKPOpts(nil) + options.WithCheckpointGlobalMinCount(1)(opts) + options.WithDisableGCCatalog()(opts) + options.WithCheckpointIncrementaInterval(time.Hour)(opts) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() + dir := path.Join(tae.Dir, "data") + dump := gcDumpArg{ + ctx: &inspectContext{ + db: tae.DB, + }, + file: path.Join(dir, "test"), + } + cmd := dump.PrepareCommand() + _ = cmd + + schema := catalog.MockSchemaAll(10, 2) + schema.Extra.BlockMaxRows = 10 + schema.Extra.ObjectMaxBlocks = 2 + tae.BindSchema(schema) + bat := catalog.MockBatch(schema, 40) + + tae.CreateRelAndAppend2(bat, true) + _, _ = tae.GetRelation() + + txn, db := tae.GetDB("db") + testutil.DropRelation2(ctx, txn, db, schema.Name) + assert.NoError(t, txn.Commit(context.Background())) + + assert.NoError(t, dump.Run()) + + txn, err := tae.StartTxn(nil) + assert.NoError(t, err) + tae.AllFlushExpected(tae.TxnMgr.Now(), 4000) + + tae.DB.ForceCheckpoint(ctx, tae.TxnMgr.Now()) + testutils.WaitExpect(2000, func() bool { + return tae.Runtime.Scheduler.GetPenddingLSNCnt() == 0 + }) + + tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), 5*time.Second) + testutils.WaitExpect(1000, func() bool { + return tae.Runtime.Scheduler.GetPenddingLSNCnt() == 0 + }) + + assert.NoError(t, txn.Commit(context.Background())) + + assert.NoError(t, dump.Run()) + + remove := gcRemoveArg{ + file: path.Join(dir, "test"), + oriDir: dir, + tarDir: path.Join(dir, "tar"), + modTime: time.Now().Unix(), + } + + assert.NoError(t, remove.Run()) + cmd = remove.PrepareCommand() + _ = cmd + + // to pass ci coverage + gc := GCArg{} + cmd = gc.PrepareCommand() + assert.NoError(t, gc.FromCommand(cmd)) + assert.NoError(t, dump.FromCommand(cmd)) + assert.NoError(t, remove.FromCommand(cmd)) + _ = dump.String() + _ = remove.String() + _ = gc.String() + assert.NoError(t, err) } diff --git a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go index 849a10d5bfd79..2e20c296b2c63 100644 --- a/pkg/vm/engine/tae/tables/jobs/flushTableTail.go +++ b/pkg/vm/engine/tae/tables/jobs/flushTableTail.go @@ -32,10 +32,10 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/util/fault" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" @@ -671,7 +671,7 @@ func (task *flushTableTailTask) mergeAObjs(ctx context.Context, isTombstone bool // write! objID := objectio.NewObjectid() name := objectio.BuildObjectNameWithObjectID(objID) - writer, err := blockio.NewBlockWriterNew( + writer, err := ioutil.NewBlockWriterNew( task.rt.Fs.Service, name, schema.Version, diff --git a/pkg/vm/engine/tae/tables/jobs/flushobj.go b/pkg/vm/engine/tae/tables/jobs/flushobj.go index edb72a770ccd4..35a14cc8a213a 100644 --- a/pkg/vm/engine/tae/tables/jobs/flushobj.go +++ b/pkg/vm/engine/tae/tables/jobs/flushobj.go @@ -22,8 +22,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/perfcounter" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" @@ -94,7 +94,7 @@ func (task *flushObjTask) Execute(ctx context.Context) (err error) { seg := task.meta.ID().Segment() name := objectio.BuildObjectName(seg, 0) task.name = name - writer, err := blockio.NewBlockWriterNew( + writer, err := ioutil.NewBlockWriterNew( task.fs.Service, name, task.schemaVer, diff --git a/pkg/vm/engine/tae/tables/jobs/mergeobjects.go b/pkg/vm/engine/tae/tables/jobs/mergeobjects.go index 4fbe27e7281eb..9742819fb3115 100644 --- a/pkg/vm/engine/tae/tables/jobs/mergeobjects.go +++ b/pkg/vm/engine/tae/tables/jobs/mergeobjects.go @@ -30,9 +30,9 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/perfcounter" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" @@ -306,7 +306,7 @@ func (task *mergeObjectsTask) prepareCommitEntry() *api.MergeCommitEntry { return commitEntry } -func (task *mergeObjectsTask) PrepareNewWriter() *blockio.BlockWriter { +func (task *mergeObjectsTask) PrepareNewWriter() *ioutil.BlockWriter { seqnums := make([]uint16, 0, len(task.schema.ColDefs)-1) for _, def := range task.schema.ColDefs { if def.IsPhyAddr() { @@ -327,7 +327,7 @@ func (task *mergeObjectsTask) PrepareNewWriter() *blockio.BlockWriter { sortkeyPos = task.schema.GetSingleSortKeyIdx() } - writer := blockio.ConstructWriterWithSegmentID( + writer := ioutil.ConstructWriterWithSegmentID( task.segmentID, task.num, task.schema.Version, diff --git a/pkg/vm/engine/tae/txn/txnimpl/table.go b/pkg/vm/engine/tae/txn/txnimpl/table.go index 44c74469e39e4..9255296d90fec 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/table.go +++ b/pkg/vm/engine/tae/txn/txnimpl/table.go @@ -32,11 +32,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" apipb "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/util" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" - "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" @@ -264,7 +264,7 @@ func (tbl *txnTable) TransferDeletes( v2.TxnS3TombstoneTransferGetSoftdeleteObjectsHistogram.Observe(time.Since(tGetSoftdeleteObjects).Seconds()) v2.TxnS3TombstoneSoftdeleteObjectCounter.Add(float64(len(softDeleteObjects))) var findTombstoneDuration, readTombstoneDuration, deleteRowsDuration time.Duration - var sinker *engine_util.Sinker + var sinker *ioutil.Sinker defer func() { if sinker != nil { sinker.Close() @@ -362,13 +362,13 @@ func (tbl *txnTable) TransferDeletes( tbl.store.warChecker.Delete(id) if currentTransferBatch != nil { if sinker == nil { - sinker = engine_util.NewTombstoneSinker( + sinker = ioutil.NewTombstoneSinker( objectio.HiddenColumnSelection_None, *pkType, common.WorkspaceAllocator, tbl.store.rt.Fs.Service, - engine_util.WithBufferSizeCap(TransferSinkerBufferSize), - engine_util.WithMemorySizeThreshold(TransferSinkerMemorySizeThreshold)) + ioutil.WithBufferSizeCap(TransferSinkerBufferSize), + ioutil.WithMemorySizeThreshold(TransferSinkerMemorySizeThreshold)) } sinker.Write(ctx, containers.ToCNBatch(currentTransferBatch)) currentTransferBatch.Close() diff --git a/pkg/vm/engine/test/sinker_test.go b/pkg/vm/engine/test/sinker_test.go index 745404fba19be..450b0d7d1bc4b 100644 --- a/pkg/vm/engine/test/sinker_test.go +++ b/pkg/vm/engine/test/sinker_test.go @@ -25,12 +25,13 @@ import ( "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/objectio/mergeutil" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" testutil3 "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" "github.com/stretchr/testify/require" ) @@ -41,13 +42,13 @@ func Test_Sinker(t *testing.T) { fs, err := fileservice.Get[fileservice.FileService](proc.GetFileService(), defines.SharedFileServiceName) require.NoError(t, err) - sinker1 := engine_util.NewTombstoneSinker( + sinker1 := ioutil.NewTombstoneSinker( objectio.HiddenColumnSelection_None, pkType, mp, fs, // engine_util.WithDedupAll(), - engine_util.WithMemorySizeThreshold(mpool.KB*400), + ioutil.WithMemorySizeThreshold(mpool.KB*400), ) blkCnt := 5 @@ -136,13 +137,13 @@ func Test_Sinker(t *testing.T) { } buffer.Clean(mp) require.Equal(t, bat1.RowCount(), bat2.RowCount()) - err = mergesort.SortColumnsByIndex( + err = mergeutil.SortColumnsByIndex( bat1.Vecs, objectio.TombstonePrimaryKeyIdx, mp, ) require.NoError(t, err) - err = mergesort.SortColumnsByIndex( + err = mergeutil.SortColumnsByIndex( bat2.Vecs, objectio.TombstonePrimaryKeyIdx, mp,