Skip to content

Commit

Permalink
Merge branch 'main' into t8
Browse files Browse the repository at this point in the history
  • Loading branch information
huby2358 authored Jan 2, 2025
2 parents b1e20c0 + eb564ec commit 238b6fe
Show file tree
Hide file tree
Showing 56 changed files with 988 additions and 533 deletions.
8 changes: 7 additions & 1 deletion pkg/datasync/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,8 +1070,14 @@ func TestCompleteData(t *testing.T) {
err := c.completeData(ctx)
assert.NoError(t, err)
files, err := fileservice.SortedList(c.dstFS.List(ctx, ""))
var count int
for i := range files {
if !files[i].IsDir && files[i].Size < 10 {
count++
}
}
assert.NoError(t, err)
assert.Equal(t, 35+(150-102), len(files))
assert.Equal(t, 150-102, count)
requiredLsn, err := c.logClient.getRequiredLsn(ctx)
assert.NoError(t, err)
assert.Equal(t, uint64(103+(150-102)-1), requiredLsn)
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/compiler_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/pubsub"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
Expand All @@ -44,7 +45,6 @@ import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func (tcc *TxnCompilerContext) GetQueryResultMeta(uuid string) ([]*plan.ColDef,
// get file size
path := catalog.BuildQueryResultMetaPath(proc.GetSessionInfo().Account, uuid)
// read meta's meta
reader, err := blockio.NewFileReader(proc.GetService(), proc.Base.FileService, path)
reader, err := ioutil.NewFileReader(proc.Base.FileService, path)
if err != nil {
return nil, "", err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/frontend/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import (
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/frontend/constant"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

Expand Down Expand Up @@ -362,7 +362,7 @@ func checkPrivilege(sid string, uuids []string, reqCtx context.Context, ses *Ses
for _, id := range uuids {
// var size int64 = -1
path := catalog.BuildQueryResultMetaPath(ses.GetTenantInfo().GetTenant(), id)
reader, err := blockio.NewFileReader(sid, f, path)
reader, err := ioutil.NewFileReader(f, path)
if err != nil {
return err
}
Expand Down Expand Up @@ -553,7 +553,7 @@ type resultFileInfo struct {
func doDumpQueryResult(ctx context.Context, ses *Session, eParam *tree.ExportParam) error {
var err error
var columnDefs *plan.ResultColDef
var reader *blockio.BlockReader
var reader *ioutil.BlockReader
var blocks []objectio.BlockObject
var files []resultFileInfo

Expand Down Expand Up @@ -690,7 +690,7 @@ func openResultMeta(ctx context.Context, ses *Session, queryId string) (*plan.Re
}
metaFile := catalog.BuildQueryResultMetaPath(account.GetTenant(), queryId)
// read meta's meta
reader, err := blockio.NewFileReader(ses.service, getPu(ses.GetService()).FileService, metaFile)
reader, err := ioutil.NewFileReader(getPu(ses.GetService()).FileService, metaFile)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -748,10 +748,10 @@ func getResultFiles(ctx context.Context, ses *Session, queryId string) ([]result
}

// openResultFile reads all blocks of the result file
func openResultFile(ctx context.Context, ses *Session, fileName string, fileSize int64) (*blockio.BlockReader, []objectio.BlockObject, error) {
func openResultFile(ctx context.Context, ses *Session, fileName string, fileSize int64) (*ioutil.BlockReader, []objectio.BlockObject, error) {
// read result's blocks
filePath := getPathOfQueryResultFile(fileName)
reader, err := blockio.NewFileReader(ses.GetService(), getPu(ses.GetService()).FileService, filePath)
reader, err := ioutil.NewFileReader(getPu(ses.GetService()).FileService, filePath)
if err != nil {
return nil, nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package blockio
package ioutil

import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand All @@ -24,28 +25,11 @@ import (
"github.com/matrixorigin/matrixone/pkg/objectio"
)

const (
AsyncIo = 1
SyncIo = 2
)

var IoModel = SyncIo

type BlockReader struct {
reader *objectio.ObjectReader
aio *IoPipeline
}

type fetchParams struct {
idxes []uint16
typs []types.Type
blk uint16
pool *mpool.MPool
reader *objectio.ObjectReader
}

func NewObjectReader(
sid string,
service fileservice.FileService,
key objectio.Location,
opts ...objectio.ReaderOptionFunc,
Expand All @@ -68,12 +52,10 @@ func NewObjectReader(
}
return &BlockReader{
reader: reader,
aio: GetPipeline(sid),
}, nil
}

func NewFileReader(
sid string,
service fileservice.FileService,
name string,
) (*BlockReader, error) {
Expand All @@ -86,7 +68,6 @@ func NewFileReader(
}
return &BlockReader{
reader: reader,
aio: GetPipeline(sid),
}, nil
}

Expand Down Expand Up @@ -117,24 +98,9 @@ func (r *BlockReader) LoadColumns(
return
}
var ioVectors fileservice.IOVector
if IoModel == AsyncIo {
proc := fetchParams{
idxes: cols,
blk: blk,
typs: typs,
pool: m,
reader: r.reader,
}
var v any
if v, err = r.aio.Fetch(ctx, proc); err != nil {
return
}
ioVectors = v.(fileservice.IOVector)
} else {
ioVectors, err = r.reader.ReadOneBlock(ctx, cols, typs, blk, m)
if err != nil {
return
}
ioVectors, err = r.reader.ReadOneBlock(ctx, cols, typs, blk, m)
if err != nil {
return
}
release = func() {
objectio.ReleaseIOVector(&ioVectors)
Expand Down Expand Up @@ -339,29 +305,3 @@ func (r *BlockReader) GetName() string {
func (r *BlockReader) GetObjectReader() *objectio.ObjectReader {
return r.reader
}

func Prefetch(
sid string,
service fileservice.FileService,
key objectio.Location,
) error {
params, err := BuildPrefetchParams(service, key)
if err != nil {
return err
}
params.typ = PrefetchFileType
return MustGetPipeline(sid).Prefetch(params)
}

func PrefetchMeta(
sid string,
service fileservice.FileService,
key objectio.Location,
) error {
params, err := BuildPrefetchParams(service, key)
if err != nil {
return err
}
params.typ = PrefetchMetaType
return MustGetPipeline(sid).Prefetch(params)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package engine_util
package ioutil

import (
"context"
Expand All @@ -23,10 +23,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/objectio/mergeutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort"
)

const DefaultInMemoryStagedSize = mpool.MB * 16
Expand Down Expand Up @@ -80,7 +78,7 @@ type FileSinker interface {
var _ FileSinker = new(FSinkerImpl)

type FSinkerImpl struct {
writer *blockio.BlockWriter
writer *BlockWriter
mp *mpool.MPool
fs fileservice.FileService

Expand All @@ -95,12 +93,12 @@ type FSinkerImpl struct {
func (s *FSinkerImpl) Sink(ctx context.Context, b *batch.Batch) error {
if s.writer == nil {
if s.isTombstone {
s.writer = blockio.ConstructTombstoneWriter(
s.writer = ConstructTombstoneWriter(
s.hiddenSelection,
s.fs,
)
} else {
s.writer = blockio.ConstructWriter(
s.writer = ConstructWriter(
s.schemaVersion,
s.seqnums,
s.sortKeyPos,
Expand Down Expand Up @@ -273,20 +271,20 @@ func NewSinker(
return sinker
}

type stats struct {
type sinkerStats struct {
Name string
HighWatermarkCnt uint64
HighWatermarkBytes uint64
CurrentCnt uint64
CurrentBytes uint64
}

func (s *stats) String() string {
func (s *sinkerStats) String() string {
return fmt.Sprintf("%s, high cnt: %d, current cnt: %d, hight bytes: %d, current bytes: %d",
s.Name, s.HighWatermarkCnt, s.CurrentCnt, s.HighWatermarkBytes, s.CurrentBytes)
}

func (s *stats) updateCount(n int) {
func (s *sinkerStats) updateCount(n int) {
if n > 0 {
s.CurrentCnt += uint64(n)
} else if n < 0 {
Expand All @@ -298,7 +296,7 @@ func (s *stats) updateCount(n int) {
}
}

func (s *stats) updateBytes(n int) {
func (s *sinkerStats) updateBytes(n int) {
if n > 0 {
s.CurrentBytes += uint64(n)
} else if n < 0 {
Expand Down Expand Up @@ -326,7 +324,7 @@ type Sinker struct {
factory FileSinkerFactory
}
staged struct {
inMemStats stats
inMemStats sinkerStats
inMemory []*batch.Batch
persisted []objectio.ObjectStats
inMemorySize int
Expand All @@ -339,7 +337,7 @@ type Sinker struct {

buf struct {
isOwner bool
bufStats stats
bufStats sinkerStats
buffers *containers.OneSchemaBatchBuffer
}

Expand Down Expand Up @@ -443,7 +441,7 @@ func (sinker *Sinker) trySortInMemoryStaged(ctx context.Context) error {
return nil
}
for _, bat := range sinker.staged.inMemory {
if err := mergesort.SortColumnsByIndex(
if err := mergeutil.SortColumnsByIndex(
bat.Vecs,
sinker.schema.sortKeyIdx,
sinker.mp,
Expand Down Expand Up @@ -487,7 +485,7 @@ func (sinker *Sinker) trySpill(ctx context.Context) error {
if sinker.schema.sortKeyIdx != -1 {
buffer := sinker.fetchBuffer() // note the lifecycle of buffer
defer sinker.putBackBuffer(buffer)
if err := colexec.MergeSortBatches(
if err := mergeutil.MergeSortBatches(
sinker.staged.inMemory,
sinker.schema.sortKeyIdx,
buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package engine_util
package ioutil

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package blockio
package ioutil

import (
hll "github.com/axiomhq/hyperloglog"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package blockio
package ioutil

import (
"context"
Expand Down
Loading

0 comments on commit 238b6fe

Please sign in to comment.