Skip to content

Commit

Permalink
feat: mo gc tool (#21003)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenbin1002 authored Jan 2, 2025
1 parent 130c4eb commit d45a6e4
Show file tree
Hide file tree
Showing 3 changed files with 441 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/vm/engine/tae/rpc/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func initCommand(_ context.Context, inspectCtx *inspectContext) *cobra.Command {
inspect := &MoInspectArg{}
rootCmd.AddCommand(inspect.PrepareCommand())

gc := &GCArg{}
rootCmd.AddCommand(gc.PrepareCommand())

return rootCmd
}

Expand Down
353 changes: 353 additions & 0 deletions pkg/vm/engine/tae/rpc/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package rpc

import (
"bufio"
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
)
Expand Down Expand Up @@ -142,6 +145,9 @@ func (c *MoInspectArg) PrepareCommand() *cobra.Command {
ckp := CheckpointArg{}
moInspectCmd.AddCommand(ckp.PrepareCommand())

gc := gcRemoveArg{}
moInspectCmd.AddCommand(gc.PrepareCommand())

return moInspectCmd
}

Expand Down Expand Up @@ -1279,3 +1285,350 @@ func (c *ckpListArg) getTableList(ctx context.Context) (res string, err error) {

return
}

type GCArg struct {
}

func (c *GCArg) PrepareCommand() *cobra.Command {
gcCmd := &cobra.Command{
Use: "gc",
Short: "gc",
Long: "Display information about a given gc",
Run: RunFactory(c),
}

gcCmd.SetUsageTemplate(c.Usage())

dump := gcDumpArg{}
gcCmd.AddCommand(dump.PrepareCommand())

remove := gcRemoveArg{}
gcCmd.AddCommand(remove.PrepareCommand())

return gcCmd
}

func (c *GCArg) FromCommand(cmd *cobra.Command) (err error) {
return nil
}

func (c *GCArg) String() string {
return "gc"
}

func (c *GCArg) Usage() (res string) {
res += "Available Commands:\n"
res += fmt.Sprintf(" %-5v show gc information\n", "stat")

res += "\n"
res += "Usage:\n"
res += "inspect table [flags] [options]\n"

res += "\n"
res += "Use \"mo-tool inspect table <command> --help\" for more information about a given command.\n"

return
}

func (c *GCArg) Run() error {
return nil
}

type gcDumpArg struct {
ctx *inspectContext
file string
res string
}

func (c *gcDumpArg) PrepareCommand() *cobra.Command {
gcDumpCmd := &cobra.Command{
Use: "dump",
Short: "gc dump",
Long: "Display information about a given gc",
Run: RunFactory(c),
}

gcDumpCmd.SetUsageTemplate(c.Usage())

gcDumpCmd.Flags().StringP("file", "f", "", "file to dump")

return gcDumpCmd
}

func (c *gcDumpArg) FromCommand(cmd *cobra.Command) (err error) {
if cmd.Flag("ictx") != nil {
c.ctx = cmd.Flag("ictx").Value.(*inspectContext)
}
c.file, _ = cmd.Flags().GetString("file")
return nil
}

func (c *gcDumpArg) String() string {
return c.res
}

func (c *gcDumpArg) Usage() (res string) {
res += "Examples:\n"
res += " # Dump the pinned objects to the file\n"
res += " inspect gc dump -f /your/path/file"
return
}

func (c *gcDumpArg) Run() (err error) {
if c.ctx == nil {
return moerr.NewInfoNoCtx("it is an online command")
}
ctx := context.Background()
now := time.Now().Unix()

err = os.MkdirAll(filepath.Dir(c.file), 0755)
if err != nil {
err = moerr.NewInternalErrorNoCtx(fmt.Sprintf("Error creating directory: %v", err))
return
}
file, err := os.OpenFile(c.file, os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_TRUNC, 0644)
if err != nil {
return
}
defer file.Close()

pinnedObjects := make(map[string]bool)

if err = c.getInMemObjects(pinnedObjects); err != nil {
return
}
if err = c.getCheckpointObject(ctx, pinnedObjects); err != nil {
return
}

for obj := range pinnedObjects {
_, err = file.WriteString(obj + "\n")
if err != nil {
err = moerr.NewInternalErrorNoCtx(fmt.Sprintf("Error writing to file: %v", err))
return
}
}

c.res = fmt.Sprintf("Dumped pinned objects to file, file count %v, start tmie %v", len(pinnedObjects), now)

return
}

func (c *gcDumpArg) getInMemObjects(pinnedObjects map[string]bool) (err error) {
dbIt := c.ctx.db.Catalog.MakeDBIt(false)
for dbIt.Valid() {
db := dbIt.Get().GetPayload()
tableIt := db.MakeTableIt(false)
for tableIt.Valid() {
table := tableIt.Get().GetPayload()
lp := new(catalog.LoopProcessor)
lp.TombstoneFn = func(be *catalog.ObjectEntry) error {
pinnedObjects[be.ObjectName().String()] = true
return nil
}
lp.ObjectFn = func(be *catalog.ObjectEntry) error {
pinnedObjects[be.ObjectName().String()] = true
return nil
}
if err = table.RecurLoop(lp); err != nil {
return
}
tableIt.Next()
}
dbIt.Next()
}
return
}

func (c *gcDumpArg) getCheckpointObject(ctx context.Context, pinned map[string]bool) (err error) {
entries := c.ctx.db.BGCheckpointRunner.GetAllCheckpoints()
for _, entry := range entries {
cnLoc := entry.GetLocation()
cnObj := cnLoc.Name().String()
pinned[cnObj] = true

tnLoc := entry.GetTNLocation()
tnObj := tnLoc.Name().String()
pinned[tnObj] = true

data, err := getCkpData(ctx, entry, c.ctx.db.Runtime.Fs)
if err != nil {
return moerr.NewInfoNoCtx(fmt.Sprintf("failed to get checkpoint data %v, %v", entry.LSN(), err))
}
getObjectsFromCkpMeta(data, pinned)
getObjectsFromCkpData(data, pinned)
}
return
}

func getObjectsFromCkpMeta(data *logtail.CheckpointData, pinned map[string]bool) {
bats := data.GetBatches()

metaBat := bats[logtail.MetaIDX]
metaAttr := logtail.MetaSchemaAttr
for _, attr := range metaAttr {
if attr == logtail.SnapshotAttr_TID {
continue
}
vec := metaBat.GetVectorByName(attr)
for i := 0; i < vec.Length(); i++ {
v := vec.Get(i).([]byte)
if len(v) == 0 {
continue
}
loc := objectio.Location(v)
obj := loc.Name().String()
pinned[obj] = true
}
}

tnBat := bats[logtail.TNMetaIDX]
vec := tnBat.GetVectorByName(logtail.CheckpointMetaAttr_BlockLocation)
for i := 0; i < vec.Length(); i++ {
v := vec.Get(i).([]byte)
if len(v) == 0 {
continue
}
loc := objectio.Location(v)
obj := loc.Name().String()
pinned[obj] = true
}
}

func getObjectsFromCkpData(data *logtail.CheckpointData, pinned map[string]bool) {
bat := data.GetObjectBatchs()
vec := bat.GetVectorByName(logtail.ObjectAttr_ObjectStats)
for i := 0; i < vec.Length(); i++ {
v := vec.Get(i).([]byte)
obj := objectio.ObjectStats(v)
pinned[obj.ObjectName().String()] = true
}

bat = data.GetTombstoneObjectBatchs()
vec = bat.GetVectorByName(logtail.ObjectAttr_ObjectStats)
for i := 0; i < vec.Length(); i++ {
v := vec.Get(i).([]byte)
obj := objectio.ObjectStats(v)
pinned[obj.ObjectName().String()] = true
}
}

type gcRemoveArg struct {
file string
oriDir string
tarDir string
modTime int64
dry bool
res string
}

func (c *gcRemoveArg) PrepareCommand() *cobra.Command {
gcRemoveCmd := &cobra.Command{
Use: "remove",
Short: "gc remove",
Long: "Remove objects from the given file",
Run: RunFactory(c),
}

gcRemoveCmd.SetUsageTemplate(c.Usage())

gcRemoveCmd.Flags().StringP("file", "f", "", "file to remove")
gcRemoveCmd.Flags().StringP("ori", "o", "", "original directory")
gcRemoveCmd.Flags().StringP("tar", "t", "", "target directory")
gcRemoveCmd.Flags().Int64P("mod", "m", 0, "modified time")
gcRemoveCmd.Flags().BoolP("dry", "", false, "dry run")

return gcRemoveCmd
}

func (c *gcRemoveArg) FromCommand(cmd *cobra.Command) (err error) {
c.file, _ = cmd.Flags().GetString("file")
c.oriDir, _ = cmd.Flags().GetString("ori")
c.tarDir, _ = cmd.Flags().GetString("tar")
c.modTime, _ = cmd.Flags().GetInt64("mod")
c.dry, _ = cmd.Flags().GetBool("dry")
return nil
}

func (c *gcRemoveArg) String() string {
return c.res
}

func (c *gcRemoveArg) Usage() (res string) {
res += "Examples:\n"
res += " # Remove objects from the given file\n"
res += " inspect gc remove -f file -o ori -t tar\n"
res += " # Remove objects from the given file with modified time\n"
res += " inspect gc remove -f file -o ori -t tar -m 1620000000"
res += " # Dry run to remove objects from the given file\n"
res += " inspect gc remove -f file -o ori -t tar -d"
return
}

func (c *gcRemoveArg) Run() (err error) {
if c.file == "" || c.oriDir == "" || c.tarDir == "" {
return moerr.NewInfoNoCtx("invalid inputs")
}

file, err := os.Open(c.file)
if err != nil {
return moerr.NewInfoNoCtx(fmt.Sprintf("failed to open file %v, %v", c.file, err))
}
defer file.Close()

pinned := make(map[string]bool)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
obj := scanner.Text()
pinned[obj] = true
}

files, err := os.ReadDir(c.oriDir)
if err != nil {
return moerr.NewInfoNoCtx(fmt.Sprintf("failed to read directory %v, %v", c.oriDir, err))
}

err = os.MkdirAll(c.tarDir, 0755)
if err != nil {
err = moerr.NewInternalErrorNoCtx(fmt.Sprintf("Error creating directory: %v", err))
return
}

toMove := make([]string, 0)
for _, obj := range files {
if obj.IsDir() || pinned[obj.Name()] {
continue
}
info, err := obj.Info()
if err != nil {
return moerr.NewInfoNoCtx(fmt.Sprintf("failed to get file info %v, %v", obj.Name(), err))
}
modTime := info.ModTime()
if c.modTime != 0 {
if modTime.Unix() > c.modTime {
continue
}
} else if time.Since(modTime).Hours() < 5*24 {
continue
}
toMove = append(toMove, obj.Name())
}

if c.dry {
c.res = fmt.Sprintf("Dry run, to remove objects %v", len(toMove))
return
}

for _, obj := range toMove {
src := filepath.Join(c.oriDir, obj)
dst := filepath.Join(c.tarDir, obj)
err = os.Rename(src, dst)
if err != nil {
return moerr.NewInfoNoCtx(fmt.Sprintf("failed to move file %v to %v, %v", src, dst, err))
}
}

c.res = fmt.Sprintf("Moved objects from %v to %v, objects count %v", c.oriDir, c.tarDir, len(toMove))

return
}
Loading

0 comments on commit d45a6e4

Please sign in to comment.