Skip to content

Commit

Permalink
[feature] datasync: add gc meta files to WAL (#19216)
Browse files Browse the repository at this point in the history
add gc meta files to WAL

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
volgariver6 authored Oct 11, 2024
1 parent f21ee7f commit 2e35c5e
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 17 deletions.
8 changes: 6 additions & 2 deletions pkg/datasync/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package datasync

import (
"bytes"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
Expand Down Expand Up @@ -72,12 +73,13 @@ func getLocations(rec logservice.LogRecord) []string {
if ei.Group == wal.GroupPrepare {
locations = append(locations, parseCommonFiles(payload)...)
} else if ei.Group == store.GroupFiles {
locations = append(locations, parseCheckpointFiles(payload)...)
locations = append(locations, parseMetaFiles(payload)...)
}
}
return locations
}

// parseCommonFiles parses the common files which are in the root directory.
func parseCommonFiles(payload []byte) []string {
if len(payload) < entryHeaderSize {
return nil
Expand Down Expand Up @@ -118,7 +120,8 @@ func parseCommonFiles(payload []byte) []string {
return locations
}

func parseCheckpointFiles(payload []byte) []string {
// parseMetaFiles parses the meta files which are in ckp/ or gc/ directory.
func parseMetaFiles(payload []byte) []string {
vec := vector.NewVec(types.Type{})
if err := vec.UnmarshalBinary(payload); err != nil {
logutil.Errorf("failed to unmarshal checkpoint file: %v", err)
Expand All @@ -127,6 +130,7 @@ func parseCheckpointFiles(payload []byte) []string {
var locations []string
for i := 0; i < vec.Length(); i++ {
file := vec.GetStringAt(i)
logutil.Infof("parsed meta file: %s", file)
locations = append(locations, file)
}
return locations
Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/tae/db/checkpoint/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package checkpoint

import (
"context"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
"time"

"github.com/matrixorigin/matrixone/pkg/container/types"
Expand All @@ -38,6 +39,7 @@ type RunnerReader interface {
GetCheckpointMetaFiles() map[string]struct{}
RemoveCheckpointMetaFile(string)
AddCheckpointMetaFile(string)
GetDriver() wal.Driver
}

func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncateLSN uint64) *containers.Batch {
Expand Down
4 changes: 4 additions & 0 deletions pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ func (r *runner) AddCheckpointMetaFile(name string) {
r.checkpointMetaFiles.files[name] = struct{}{}
}

func (r *runner) GetDriver() wal.Driver {
return r.wal
}

func (r *runner) RemoveCheckpointMetaFile(name string) {
r.checkpointMetaFiles.Lock()
defer r.checkpointMetaFiles.Unlock()
Expand Down
44 changes: 40 additions & 4 deletions pkg/vm/engine/tae/db/gc/v2/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/store"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils"
"go.uber.org/zap"
Expand Down Expand Up @@ -478,7 +479,7 @@ func (c *checkpointCleaner) mergeGCFile() error {
mergeTable = c.inputs.tables[0]
}
c.inputs.RUnlock()
_, err = mergeTable.SaveFullTable(maxConsumed.GetStart(), maxConsumed.GetEnd(), c.fs, nil)
name, err := mergeTable.SaveFullTable(maxConsumed.GetStart(), maxConsumed.GetEnd(), c.fs)
if err != nil {
logutil.Errorf("SaveTable failed: %v", err.Error())
return err
Expand All @@ -489,6 +490,11 @@ func (c *checkpointCleaner) mergeGCFile() error {
return err
}
c.updateMinMerged(maxConsumed)

if err := c.appendFilesToWAL(name); err != nil {
logutil.Errorf("append gc file to WAL failed: %v", err)
return err
}
return nil
}

Expand Down Expand Up @@ -1018,6 +1024,7 @@ func (c *checkpointCleaner) createNewInput(
zap.String("snapshots", c.snapshotMeta.String()))
}()
var data *logtail.CheckpointData
var gcFiles []string
for _, candidate := range ckps {
startts, endts := candidate.GetStart(), candidate.GetEnd()
data, err = c.collectCkpData(candidate)
Expand All @@ -1037,27 +1044,56 @@ func (c *checkpointCleaner) createNewInput(
logutil.Errorf("SaveMeta is failed")
return
}
if snapSize > 0 {
gcFiles = append(gcFiles, name)
}
name = blockio.EncodeTableMetadataFileName(GCMetaDir,
PrefixAcctMeta, ckps[0].GetStart(), ckps[len(ckps)-1].GetEnd())
tableSize, err = c.snapshotMeta.SaveTableInfo(name, c.fs.Service)
if err != nil {
logutil.Errorf("SaveTableInfo is failed")
return
}
files := c.GetAndClearOutputs()
_, err = input.SaveTable(
if tableSize > 0 {
gcFiles = append(gcFiles, name)
}

name, err = input.SaveTable(
ckps[0].GetStart(),
ckps[len(ckps)-1].GetEnd(),
c.fs,
files,
)
if err != nil {
return
}
gcFiles = append(gcFiles, name)

err = c.appendFilesToWAL(gcFiles...)
if err != nil {
logutil.Errorf("append gc files to WAL failed: %v", err)
return
}

return
}

// appendFilesToWAL append the GC meta files to WAL.
func (c *checkpointCleaner) appendFilesToWAL(files ...string) error {
driver := c.ckpClient.GetDriver()
if driver == nil {
return nil
}
entry, err := store.BuildFilesEntry(files)
if err != nil {
return err
}
_, err = driver.AppendEntry(store.GroupFiles, entry)
if err != nil {
return err
}
return nil
}

func (c *checkpointCleaner) updateSnapshot(
ctx context.Context,
fs fileservice.FileService,
Expand Down
24 changes: 14 additions & 10 deletions pkg/vm/engine/tae/db/gc/v2/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,27 +275,31 @@ func (t *GCTable) collectData() []*containers.Batch {
return bats
}

// SaveTable is to write data to s3
func (t *GCTable) SaveTable(start, end types.TS, fs *objectio.ObjectFS, files []string) ([]objectio.BlockObject, error) {
// SaveTable is to write data to s3.
// It returns the gc meta file name and the error.
func (t *GCTable) SaveTable(start, end types.TS, fs *objectio.ObjectFS) (string, error) {
bats := t.collectData()
defer t.closeBatch(bats)
name := blockio.EncodeCheckpointMetadataFileName(GCMetaDir, PrefixGCMeta, start, end)
writer, err := objectio.NewObjectWriterSpecial(objectio.WriterGC, name, fs.Service)
if err != nil {
return nil, err
return "", err
}
for i := range bats {
if _, err := writer.WriteWithoutSeqnum(containers.ToCNBatch(bats[i])); err != nil {
return nil, err
return "", err
}
}

blocks, err := writer.WriteEnd(context.Background())
return blocks, err
_, err = writer.WriteEnd(context.Background())
if err != nil {
return "", err
}
return name, nil
}

// SaveFullTable is to write data to s3
func (t *GCTable) SaveFullTable(start, end types.TS, fs *objectio.ObjectFS, files []string) ([]objectio.BlockObject, error) {
func (t *GCTable) SaveFullTable(start, end types.TS, fs *objectio.ObjectFS) (string, error) {
now := time.Now()
var bats []*containers.Batch
var blocks []objectio.BlockObject
Expand Down Expand Up @@ -330,17 +334,17 @@ func (t *GCTable) SaveFullTable(start, end types.TS, fs *objectio.ObjectFS, file
name := blockio.EncodeGCMetadataFileName(GCMetaDir, PrefixGCMeta, start, end)
writer, err = objectio.NewObjectWriterSpecial(objectio.WriterGC, name, fs.Service)
if err != nil {
return nil, err
return "", err
}
for i := range bats {
if _, err := writer.WriteWithoutSeqnum(containers.ToCNBatch(bats[i])); err != nil {
return nil, err
return "", err
}
}

blocks, err = writer.WriteEnd(context.Background())
writeCost = time.Since(now)
return blocks, err
return name, err
}

func (t *GCTable) rebuildTable(bats []*containers.Batch, idx BatchType, objects map[string]*ObjectEntry) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e
scanner)
db.BGScanner.Start()
// TODO: WithGCInterval requires configuration parameters
cleaner := gc2.NewCheckpointCleaner(opts.Ctx, opts.SID, fs, db.BGCheckpointRunner, opts.GCCfg.DisableGC)
cleaner := gc2.NewCheckpointCleaner(
opts.Ctx,
opts.SID,
fs,
db.BGCheckpointRunner,
opts.GCCfg.DisableGC,
)
cleaner.SetCheckGC(opts.GCCfg.CheckGC)
cleaner.AddChecker(
func(item any) bool {
Expand Down
12 changes: 12 additions & 0 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6516,6 +6516,18 @@ func TestAppendAndGC2(t *testing.T) {
}
logutil.Infof("file %s in meta files", file)
}

// check gc meta files
var gcFile bool
for file := range files {
if strings.Contains(file, "/gc_") && strings.Contains(file, ".ckp") {
gcFile = true
break
}
}
if !gcFile {
panic("gc meta files not found")
}
}

func TestSnapshotGC(t *testing.T) {
Expand Down

0 comments on commit 2e35c5e

Please sign in to comment.