From 2e35c5ef11bbce6607a52d5bbd0edef79ae26b1b Mon Sep 17 00:00:00 2001 From: LiuBo Date: Sat, 12 Oct 2024 04:52:23 +0800 Subject: [PATCH] [feature] datasync: add gc meta files to WAL (#19216) add gc meta files to WAL Approved by: @XuPeng-SH, @sukki37 --- pkg/datasync/entry.go | 8 +++-- pkg/vm/engine/tae/db/checkpoint/info.go | 2 ++ pkg/vm/engine/tae/db/checkpoint/runner.go | 4 +++ pkg/vm/engine/tae/db/gc/v2/checkpoint.go | 44 ++++++++++++++++++++--- pkg/vm/engine/tae/db/gc/v2/table.go | 24 +++++++------ pkg/vm/engine/tae/db/open.go | 8 ++++- pkg/vm/engine/tae/db/test/db_test.go | 12 +++++++ 7 files changed, 85 insertions(+), 17 deletions(-) diff --git a/pkg/datasync/entry.go b/pkg/datasync/entry.go index c2d0612d8c3ea..614a85194e56e 100644 --- a/pkg/datasync/entry.go +++ b/pkg/datasync/entry.go @@ -16,6 +16,7 @@ package datasync import ( "bytes" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/logutil" @@ -72,12 +73,13 @@ func getLocations(rec logservice.LogRecord) []string { if ei.Group == wal.GroupPrepare { locations = append(locations, parseCommonFiles(payload)...) } else if ei.Group == store.GroupFiles { - locations = append(locations, parseCheckpointFiles(payload)...) + locations = append(locations, parseMetaFiles(payload)...) } } return locations } +// parseCommonFiles parses the common files which are in the root directory. func parseCommonFiles(payload []byte) []string { if len(payload) < entryHeaderSize { return nil @@ -118,7 +120,8 @@ func parseCommonFiles(payload []byte) []string { return locations } -func parseCheckpointFiles(payload []byte) []string { +// parseMetaFiles parses the meta files which are in ckp/ or gc/ directory. +func parseMetaFiles(payload []byte) []string { vec := vector.NewVec(types.Type{}) if err := vec.UnmarshalBinary(payload); err != nil { logutil.Errorf("failed to unmarshal checkpoint file: %v", err) @@ -127,6 +130,7 @@ func parseCheckpointFiles(payload []byte) []string { var locations []string for i := 0; i < vec.Length(); i++ { file := vec.GetStringAt(i) + logutil.Infof("parsed meta file: %s", file) locations = append(locations, file) } return locations diff --git a/pkg/vm/engine/tae/db/checkpoint/info.go b/pkg/vm/engine/tae/db/checkpoint/info.go index a97def91e7944..5062626d76ce3 100644 --- a/pkg/vm/engine/tae/db/checkpoint/info.go +++ b/pkg/vm/engine/tae/db/checkpoint/info.go @@ -16,6 +16,7 @@ package checkpoint import ( "context" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" "time" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -38,6 +39,7 @@ type RunnerReader interface { GetCheckpointMetaFiles() map[string]struct{} RemoveCheckpointMetaFile(string) AddCheckpointMetaFile(string) + GetDriver() wal.Driver } func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncateLSN uint64) *containers.Batch { diff --git a/pkg/vm/engine/tae/db/checkpoint/runner.go b/pkg/vm/engine/tae/db/checkpoint/runner.go index d4a843a368820..e66929fda2b67 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner.go @@ -310,6 +310,10 @@ func (r *runner) AddCheckpointMetaFile(name string) { r.checkpointMetaFiles.files[name] = struct{}{} } +func (r *runner) GetDriver() wal.Driver { + return r.wal +} + func (r *runner) RemoveCheckpointMetaFile(name string) { r.checkpointMetaFiles.Lock() defer r.checkpointMetaFiles.Unlock() diff --git a/pkg/vm/engine/tae/db/gc/v2/checkpoint.go b/pkg/vm/engine/tae/db/gc/v2/checkpoint.go index e473264da418f..97a3fb8d1d39b 100644 --- a/pkg/vm/engine/tae/db/gc/v2/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v2/checkpoint.go @@ -32,6 +32,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/store" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" "go.uber.org/zap" @@ -478,7 +479,7 @@ func (c *checkpointCleaner) mergeGCFile() error { mergeTable = c.inputs.tables[0] } c.inputs.RUnlock() - _, err = mergeTable.SaveFullTable(maxConsumed.GetStart(), maxConsumed.GetEnd(), c.fs, nil) + name, err := mergeTable.SaveFullTable(maxConsumed.GetStart(), maxConsumed.GetEnd(), c.fs) if err != nil { logutil.Errorf("SaveTable failed: %v", err.Error()) return err @@ -489,6 +490,11 @@ func (c *checkpointCleaner) mergeGCFile() error { return err } c.updateMinMerged(maxConsumed) + + if err := c.appendFilesToWAL(name); err != nil { + logutil.Errorf("append gc file to WAL failed: %v", err) + return err + } return nil } @@ -1018,6 +1024,7 @@ func (c *checkpointCleaner) createNewInput( zap.String("snapshots", c.snapshotMeta.String())) }() var data *logtail.CheckpointData + var gcFiles []string for _, candidate := range ckps { startts, endts := candidate.GetStart(), candidate.GetEnd() data, err = c.collectCkpData(candidate) @@ -1037,6 +1044,9 @@ func (c *checkpointCleaner) createNewInput( logutil.Errorf("SaveMeta is failed") return } + if snapSize > 0 { + gcFiles = append(gcFiles, name) + } name = blockio.EncodeTableMetadataFileName(GCMetaDir, PrefixAcctMeta, ckps[0].GetStart(), ckps[len(ckps)-1].GetEnd()) tableSize, err = c.snapshotMeta.SaveTableInfo(name, c.fs.Service) @@ -1044,20 +1054,46 @@ func (c *checkpointCleaner) createNewInput( logutil.Errorf("SaveTableInfo is failed") return } - files := c.GetAndClearOutputs() - _, err = input.SaveTable( + if tableSize > 0 { + gcFiles = append(gcFiles, name) + } + + name, err = input.SaveTable( ckps[0].GetStart(), ckps[len(ckps)-1].GetEnd(), c.fs, - files, ) if err != nil { return } + gcFiles = append(gcFiles, name) + + err = c.appendFilesToWAL(gcFiles...) + if err != nil { + logutil.Errorf("append gc files to WAL failed: %v", err) + return + } return } +// appendFilesToWAL append the GC meta files to WAL. +func (c *checkpointCleaner) appendFilesToWAL(files ...string) error { + driver := c.ckpClient.GetDriver() + if driver == nil { + return nil + } + entry, err := store.BuildFilesEntry(files) + if err != nil { + return err + } + _, err = driver.AppendEntry(store.GroupFiles, entry) + if err != nil { + return err + } + return nil +} + func (c *checkpointCleaner) updateSnapshot( ctx context.Context, fs fileservice.FileService, diff --git a/pkg/vm/engine/tae/db/gc/v2/table.go b/pkg/vm/engine/tae/db/gc/v2/table.go index 0931eedfd107d..1b6191f8c2fea 100644 --- a/pkg/vm/engine/tae/db/gc/v2/table.go +++ b/pkg/vm/engine/tae/db/gc/v2/table.go @@ -275,27 +275,31 @@ func (t *GCTable) collectData() []*containers.Batch { return bats } -// SaveTable is to write data to s3 -func (t *GCTable) SaveTable(start, end types.TS, fs *objectio.ObjectFS, files []string) ([]objectio.BlockObject, error) { +// SaveTable is to write data to s3. +// It returns the gc meta file name and the error. +func (t *GCTable) SaveTable(start, end types.TS, fs *objectio.ObjectFS) (string, error) { bats := t.collectData() defer t.closeBatch(bats) name := blockio.EncodeCheckpointMetadataFileName(GCMetaDir, PrefixGCMeta, start, end) writer, err := objectio.NewObjectWriterSpecial(objectio.WriterGC, name, fs.Service) if err != nil { - return nil, err + return "", err } for i := range bats { if _, err := writer.WriteWithoutSeqnum(containers.ToCNBatch(bats[i])); err != nil { - return nil, err + return "", err } } - blocks, err := writer.WriteEnd(context.Background()) - return blocks, err + _, err = writer.WriteEnd(context.Background()) + if err != nil { + return "", err + } + return name, nil } // SaveFullTable is to write data to s3 -func (t *GCTable) SaveFullTable(start, end types.TS, fs *objectio.ObjectFS, files []string) ([]objectio.BlockObject, error) { +func (t *GCTable) SaveFullTable(start, end types.TS, fs *objectio.ObjectFS) (string, error) { now := time.Now() var bats []*containers.Batch var blocks []objectio.BlockObject @@ -330,17 +334,17 @@ func (t *GCTable) SaveFullTable(start, end types.TS, fs *objectio.ObjectFS, file name := blockio.EncodeGCMetadataFileName(GCMetaDir, PrefixGCMeta, start, end) writer, err = objectio.NewObjectWriterSpecial(objectio.WriterGC, name, fs.Service) if err != nil { - return nil, err + return "", err } for i := range bats { if _, err := writer.WriteWithoutSeqnum(containers.ToCNBatch(bats[i])); err != nil { - return nil, err + return "", err } } blocks, err = writer.WriteEnd(context.Background()) writeCost = time.Since(now) - return blocks, err + return name, err } func (t *GCTable) rebuildTable(bats []*containers.Batch, idx BatchType, objects map[string]*ObjectEntry) { diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index da80b05ef2f8c..19f52f2a0f820 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -248,7 +248,13 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e scanner) db.BGScanner.Start() // TODO: WithGCInterval requires configuration parameters - cleaner := gc2.NewCheckpointCleaner(opts.Ctx, opts.SID, fs, db.BGCheckpointRunner, opts.GCCfg.DisableGC) + cleaner := gc2.NewCheckpointCleaner( + opts.Ctx, + opts.SID, + fs, + db.BGCheckpointRunner, + opts.GCCfg.DisableGC, + ) cleaner.SetCheckGC(opts.GCCfg.CheckGC) cleaner.AddChecker( func(item any) bool { diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index a2909f8dbbf0b..91e13479c04e1 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -6516,6 +6516,18 @@ func TestAppendAndGC2(t *testing.T) { } logutil.Infof("file %s in meta files", file) } + + // check gc meta files + var gcFile bool + for file := range files { + if strings.Contains(file, "/gc_") && strings.Contains(file, ".ckp") { + gcFile = true + break + } + } + if !gcFile { + panic("gc meta files not found") + } } func TestSnapshotGC(t *testing.T) {