Skip to content

Commit

Permalink
reimplement checkpoint storage for future big data processing - 1 (#2…
Browse files Browse the repository at this point in the history
…1056)

reimplement checkpoint storage 1 for further future big data processing

Approved by: @LeftHandCold
  • Loading branch information
XuPeng-SH authored Jan 2, 2025
1 parent efb4708 commit 34949a2
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 4 deletions.
6 changes: 6 additions & 0 deletions pkg/container/types/packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ func (p *Packer) EncodeUuid(e Uuid) {
p.putBytes(e[:])
}

func (p *Packer) EncodeObjectid(e *Objectid) {
p.putByte(objectIdCode)
p.putBytes(e[:SegmentidSize])
p.encodeUint(uint64(e.Offset()))
}

func (p *Packer) GetBuf() []byte {
return p.buffer
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/container/types/rowid.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"unsafe"

"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/common/util"
)

/*
Expand Down Expand Up @@ -359,3 +360,9 @@ func (o *Objectid) LT(other *Objectid) bool {
func (o *Objectid) GT(other *Objectid) bool {
return o.Compare(other) > 0
}

func (o *Objectid) Copy(offset uint16) Objectid {
ret := *o
copy(ret[SegmentidSize:], util.UnsafeToBytes(&offset))
return ret
}
15 changes: 15 additions & 0 deletions pkg/container/types/tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"unsafe"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
)

Expand Down Expand Up @@ -213,6 +214,7 @@ const (
enumCode = 0x50 // TODO: reorder the list to put timeCode next to date type code?
bitCode = 0x51
uuidCode = 0x52
objectIdCode = 0x53
)

var sizeLimits = []uint64{
Expand Down Expand Up @@ -426,6 +428,16 @@ func decodeUuid(b []byte) (Uuid, int) {
return ret, 17
}

func decodeObjectid(b []byte) (Objectid, int) {
var ret Objectid
segid, pos1 := decodeUuid(b)
offset, pos2 := decodeUint(uint16Code, b[pos1:])
u16 := offset.(uint16)
copy(ret[:], segid[:])
copy(ret[UuidSize:], util.UnsafeToBytes(&u16))
return ret, pos1 + pos2
}

var DecodeTuple = decodeTuple

func decodeTuple(b []byte) (Tuple, int, []T, error) {
Expand Down Expand Up @@ -527,6 +539,9 @@ func decodeTuple(b []byte) (Tuple, int, []T, error) {
schema = append(schema, T_uuid)
el, off = decodeUuid(b[i:])
// off += 1
case b[i] == objectIdCode:
schema = append(schema, T_Objectid)
el, off = decodeObjectid(b[i:])
default:
return nil, i, nil, moerr.NewInternalErrorNoCtxf("unable to decode tuple element with unknown typecode %02x", b[i])
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/container/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ const (
T_datalink T = 72

// Transaction TS
T_TS T = 100
T_Rowid T = 101
T_Blockid T = 102
T_TS T = 100
T_Rowid T = 101
T_Blockid T = 102
T_Objectid T = 103

// system family
T_tuple T = 201
Expand Down Expand Up @@ -708,6 +709,8 @@ func (t T) String() string {
return "DATALINK"
case T_TS:
return "TRANSACTION TIMESTAMP"
case T_Objectid:
return "OBJECTID"
case T_Rowid:
return "ROWID"
case T_uuid:
Expand Down
61 changes: 61 additions & 0 deletions pkg/objectio/ckputil/sinker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ckputil

import (
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
)

func EncodeCluser(
packer *types.Packer,
tableId uint64,
obj *objectio.ObjectId,
) {
packer.EncodeUint64(tableId)
packer.EncodeObjectid(obj)
}

var SinkerFactory ioutil.FileSinkerFactory

func init() {
SinkerFactory = ioutil.NewFSinkerImplFactory(
TableObjectsSeqnums,
TableObjectsAttr_Cluster_Idx,
false,
false,
0,
)
}

func NewSinker(
mp *mpool.MPool,
fs fileservice.FileService,
opts ...ioutil.SinkerOption,
) *ioutil.Sinker {
opts = append(opts, ioutil.WithTailSizeCap(0))
return ioutil.NewSinker(
TableObjectsAttr_Cluster_Idx,
TableObjectsAttrs,
TableObjectsTypes,
SinkerFactory,
mp,
fs,
opts...,
)
}
173 changes: 173 additions & 0 deletions pkg/objectio/ckputil/sinker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ckputil

import (
"bytes"
"context"
"sort"
"testing"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/stretchr/testify/require"
)

func Test_ClusterKey1(t *testing.T) {
packer := types.NewPacker()
defer packer.Close()

tableId := uint64(20)
obj := types.NewObjectid()

EncodeCluser(packer, tableId, obj)

buf := packer.Bytes()
packer.Reset()

tuple, _, schemas, err := types.DecodeTuple(buf)
require.NoError(t, err)
require.Equalf(t, 2, len(schemas), "schemas: %v", schemas)
require.Equalf(t, types.T_uint64, schemas[0], "schemas: %v", schemas)
require.Equalf(t, types.T_Objectid, schemas[1], "schemas: %v", schemas)

t.Log(tuple.SQLStrings(nil))
require.Equal(t, tableId, tuple[0].(uint64))
oid := tuple[1].(types.Objectid)
require.True(t, obj.EQ(&oid))
}

func Test_ClusterKey2(t *testing.T) {
packer := types.NewPacker()
defer packer.Close()
cnt := 5000
clusters := make([][]byte, 0, cnt)
objTemplate := types.NewObjectid()
for i := cnt; i >= 1; i-- {
obj := objTemplate.Copy(uint16(i))
EncodeCluser(packer, 1, &obj)
clusters = append(clusters, packer.Bytes())
packer.Reset()
}
sort.Slice(clusters, func(i, j int) bool {
return bytes.Compare(clusters[i], clusters[j]) < 0
})

last := uint16(0)
for _, cluster := range clusters {
tuple, _, _, err := types.DecodeTuple(cluster)
require.NoError(t, err)
require.Equalf(t, 2, len(tuple), "%v", tuple)
require.Equal(t, uint64(1), tuple[0].(uint64))
obj := tuple[1].(types.Objectid)
curr := obj.Offset()
require.Truef(t, curr > last, "%v,%v", curr, last)
last = curr
}
}

func Test_Sinker1(t *testing.T) {
proc := testutil.NewProc()
fs, err := fileservice.Get[fileservice.FileService](
proc.GetFileService(), defines.SharedFileServiceName,
)
require.NoError(t, err)
mp := proc.Mp()

bat := NewObjectListBatch()
accountId := uint32(1)
mapping := map[uint64][]uint64{
1: {41, 31, 21, 11, 1},
2: {42, 32, 22, 12, 2},
3: {43, 33, 23, 13, 3},
}
dbs := []uint64{1, 2, 3}

sinker := NewSinker(
mp,
fs,
ioutil.WithMemorySizeThreshold(mpool.KB),
)
defer sinker.Close()

packer := types.NewPacker()
defer packer.Close()

fillNext := func(data *batch.Batch, rows int) {
data.CleanOnlyData()
for i, vec := range data.Vecs {
if i == TableObjectsAttr_Accout_Idx {
for j := 0; j < rows; j++ {
require.NoError(t, vector.AppendMultiFixed(vec, accountId, false, rows, mp))
}
} else if i == TableObjectsAttr_DB_Idx {
tableVec := data.Vecs[TableObjectsAttr_Table_Idx]
idVec := data.Vecs[TableObjectsAttr_ID_Idx]
clusterVec := data.Vecs[TableObjectsAttr_Cluster_Idx]
for j := 0; j < rows; j++ {
dbid := dbs[j%len(dbs)]
tables := mapping[dbid]
tableid := tables[j%len(tables)]

var obj objectio.ObjectStats
objname := objectio.MockObjectName()
objectio.SetObjectStatsObjectName(&obj, objname)
packer.Reset()
EncodeCluser(packer, tableid, objname.ObjectId())

require.NoError(t, vector.AppendFixed(vec, dbid, false, mp))
require.NoError(t, vector.AppendFixed(tableVec, tableid, false, mp))
require.NoError(t, vector.AppendBytes(idVec, []byte(objname), false, mp))
require.NoError(t, vector.AppendBytes(clusterVec, packer.Bytes(), false, mp))
}
} else if i == TableObjectsAttr_CreateTS_Idx {
for j := 0; j < rows; j++ {
require.NoError(t, vector.AppendFixed(vec, types.NextGlobalTsForTest(), false, mp))
}
} else if i == TableObjectsAttr_DeleteTS_Idx {
for j := 0; j < rows; j++ {
require.NoError(t, vector.AppendFixed(vec, types.NextGlobalTsForTest(), false, mp))
}
}
}
data.SetRowCount(rows)
}

ctx := context.Background()

rows := 0
for i := 0; i < 5; i++ {
fillNext(bat, 100)
require.NoError(t, sinker.Write(ctx, bat))
rows += 100
}
require.NoError(t, sinker.Sync(ctx))
files, inMems := sinker.GetResult()
require.Equal(t, 0, len(inMems))
totalRows := 0
for _, file := range files {
t.Log(file.String())
totalRows += int(file.Rows())
}
require.Equal(t, 5, len(files))
require.Equal(t, rows, totalRows)
}
Loading

0 comments on commit 34949a2

Please sign in to comment.