Skip to content

Commit

Permalink
cleanup checkpoint related code (#21037)
Browse files Browse the repository at this point in the history
cleanup checkpoint-related code

Approved by: @LeftHandCold, @aressu1985
  • Loading branch information
XuPeng-SH authored Jan 1, 2025
1 parent 926e28f commit 63d4557
Show file tree
Hide file tree
Showing 21 changed files with 227 additions and 258 deletions.
12 changes: 6 additions & 6 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestBackupData(t *testing.T) {
currTs := types.BuildTS(backupTime.UnixNano(), 0)
locations := make([]string, 0)
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
location, err := db.ForceCheckpointForBackup(ctx, currTs)
assert.Nil(t, err)
_, err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestBackupData2(t *testing.T) {
currTs := types.BuildTS(backupTime.UnixNano(), 0)
locations := make([]string, 0)
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
location, err := db.ForceCheckpointForBackup(ctx, currTs)
assert.Nil(t, err)
_, err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestBackupData3(t *testing.T) {
currTs := types.BuildTS(backupTime.UnixNano(), 0)
locations := make([]string, 0)
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
location, err := db.ForceCheckpointForBackup(ctx, currTs)
assert.Nil(t, err)
_, err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestBackupData3(t *testing.T) {
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), true)
assert.NoError(t, txn.Commit(context.Background()))
db.MergeBlocks(true)
db.ForceGlobalCheckpoint(ctx, db.TxnMgr.Now(), time.Second, time.Second)
db.ForceGlobalCheckpoint(ctx, db.TxnMgr.Now(), time.Second)
t.Log(db.Catalog.SimplePPString(3))
db.Restart(ctx)

Expand Down Expand Up @@ -383,7 +383,7 @@ func TestBackupData4(t *testing.T) {
currTs := types.BuildTS(backupTime.UnixNano(), 0)
locations := make([]string, 0)
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
location, err := db.ForceCheckpointForBackup(ctx, currTs)
assert.Nil(t, err)
_, err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestBackupData4(t *testing.T) {
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), true)
assert.NoError(t, txn.Commit(context.Background()))
db.MergeBlocks(true)
db.ForceGlobalCheckpoint(ctx, db.TxnMgr.Now(), time.Second, time.Second)
db.ForceGlobalCheckpoint(ctx, db.TxnMgr.Now(), time.Second)
t.Log(db.Catalog.SimplePPString(3))
db.Restart(ctx)

Expand Down
17 changes: 4 additions & 13 deletions pkg/vm/engine/tae/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,16 @@ package common
import (
"context"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
)

type RetryOp = func() error
type WaitOp = func() (ok bool, err error)

func RetryWithIntervalAndTimeout(
func RetryWithInterval(
ctx context.Context,
op WaitOp,
timeout time.Duration,
interval time.Duration,
suppressTimout bool,
) (err error) {

ctx, cancel := context.WithTimeoutCause(context.Background(), timeout, moerr.CauseRetryWithIntervalAndTimeout)
defer cancel()

ticker := time.NewTicker(interval)
defer ticker.Stop()

Expand All @@ -46,10 +39,8 @@ func RetryWithIntervalAndTimeout(
for {
select {
case <-ctx.Done():
if suppressTimout {
return moerr.GetOkExpectedEOB()
}
return moerr.AttachCause(ctx, moerr.NewInternalError(ctx, "timeout"))
err = context.Cause(ctx)
return
case <-ticker.C:
ok, err = op()
if ok {
Expand Down
14 changes: 6 additions & 8 deletions pkg/vm/engine/tae/db/checkpoint/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (job *checkpointJob) RunGCKP(ctx context.Context) (err error) {
job.gckpCtx.end,
job.gckpCtx.ckpLSN,
job.gckpCtx.truncateLSN,
job.gckpCtx.interval,
job.gckpCtx.histroyRetention,
)

return
Expand Down Expand Up @@ -225,9 +225,7 @@ func (job *checkpointJob) RunICKP(ctx context.Context) (err error) {

var files []string
var file string
if fields, files, err = runner.doIncrementalCheckpoint(
job.executor.ctx, &job.executor.cfg, entry,
); err != nil {
if fields, files, err = job.executor.doIncrementalCheckpoint(entry); err != nil {
errPhase = "do-ckp"
rollback()
return
Expand Down Expand Up @@ -275,10 +273,10 @@ func (job *checkpointJob) RunICKP(ctx context.Context) (err error) {

runner.postCheckpointQueue.Enqueue(entry)
runner.TryTriggerExecuteGCKP(&gckpContext{
end: entry.end,
interval: job.executor.cfg.GlobalHistoryDuration,
ckpLSN: lsn,
truncateLSN: lsnToTruncate,
end: entry.end,
histroyRetention: job.executor.cfg.GlobalHistoryDuration,
ckpLSN: lsn,
truncateLSN: lsnToTruncate,
})

return nil
Expand Down
48 changes: 24 additions & 24 deletions pkg/vm/engine/tae/db/checkpoint/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,17 @@ type FlushMutableCfg struct {
ForceFlushCheckInterval time.Duration
}

type tableAndSize struct {
tbl *catalog.TableEntry
asize int
dsize int
}

type Flusher interface {
IsAllChangesFlushed(start, end types.TS, doPrint bool) bool
FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error
ForceFlush(ctx context.Context, ts types.TS, duration time.Duration) error
ForceFlushWithInterval(ctx context.Context, ts types.TS, forceDuration, flushInterval time.Duration) (err error)
ForceFlush(ctx context.Context, ts types.TS) error
ForceFlushWithInterval(ctx context.Context, ts types.TS, flushInterval time.Duration) (err error)
ChangeForceFlushTimeout(timeout time.Duration)
ChangeForceCheckInterval(interval time.Duration)
GetCfg() FlushCfg
Expand Down Expand Up @@ -212,22 +218,22 @@ func (f *flusher) FlushTable(ctx context.Context, dbID, tableID uint64, ts types
return impl.FlushTable(ctx, dbID, tableID, ts)
}

func (f *flusher) ForceFlush(ctx context.Context, ts types.TS, duration time.Duration) error {
func (f *flusher) ForceFlush(ctx context.Context, ts types.TS) error {
impl := f.impl.Load()
if impl == nil {
return ErrFlusherStopped
}
return impl.ForceFlush(ctx, ts, duration)
return impl.ForceFlush(ctx, ts)
}

func (f *flusher) ForceFlushWithInterval(
ctx context.Context, ts types.TS, forceDuration, flushInterval time.Duration,
ctx context.Context, ts types.TS, flushInterval time.Duration,
) (err error) {
impl := f.impl.Load()
if impl == nil {
return ErrFlusherStopped
}
return impl.ForceFlushWithInterval(ctx, ts, forceDuration, flushInterval)
return impl.ForceFlushWithInterval(ctx, ts, flushInterval)
}

func (f *flusher) ChangeForceFlushTimeout(timeout time.Duration) {
Expand Down Expand Up @@ -593,15 +599,17 @@ func (flusher *flushImpl) ChangeForceCheckInterval(interval time.Duration) {
}

func (flusher *flushImpl) ForceFlush(
ctx context.Context, ts types.TS, forceDuration time.Duration,
ctx context.Context, ts types.TS,
) (err error) {
return flusher.ForceFlushWithInterval(
ctx, ts, forceDuration, 0,
ctx, ts, 0,
)
}

func (flusher *flushImpl) ForceFlushWithInterval(
ctx context.Context, ts types.TS, forceDuration, flushInterval time.Duration,
ctx context.Context,
ts types.TS,
flushInterval time.Duration,
) (err error) {
makeRequest := func() *FlushRequest {
tree := flusher.sourcer.ScanInRangePruned(types.TS{}, ts)
Expand Down Expand Up @@ -629,17 +637,13 @@ func (flusher *flushImpl) ForceFlushWithInterval(

cfg := flusher.mutableCfg.Load()

if forceDuration <= 0 {
forceDuration = cfg.ForceFlushTimeout
}
if flushInterval <= 0 {
flushInterval = cfg.ForceFlushCheckInterval
}
if err = common.RetryWithIntervalAndTimeout(
if err = common.RetryWithInterval(
ctx,
op,
forceDuration,
flushInterval,
false,
); err != nil {
return moerr.NewInternalErrorf(ctx, "force flush failed: %v", err)
}
Expand All @@ -648,7 +652,6 @@ func (flusher *flushImpl) ForceFlushWithInterval(
err = moerr.NewInternalError(ctx, sarg)
}
return

}

func (flusher *flushImpl) GetCfg() FlushCfg {
Expand All @@ -662,7 +665,9 @@ func (flusher *flushImpl) GetCfg() FlushCfg {
}

func (flusher *flushImpl) FlushTable(
ctx context.Context, dbID, tableID uint64, ts types.TS,
ctx context.Context,
dbID, tableID uint64,
ts types.TS,
) (err error) {
iarg, sarg, flush := fault.TriggerFault("flush_table_error")
if flush && (iarg == 0 || rand.Int63n(iarg) == 0) {
Expand Down Expand Up @@ -698,16 +703,11 @@ func (flusher *flushImpl) FlushTable(

cfg := flusher.mutableCfg.Load()

err = common.RetryWithIntervalAndTimeout(
err = common.RetryWithInterval(
ctx,
op,
cfg.ForceFlushTimeout,
cfg.ForceFlushCheckInterval,
true,
)
if moerr.IsMoErrCode(err, moerr.ErrInternal) || moerr.IsMoErrCode(err, moerr.OkExpectedEOB) {
logutil.Warnf("Flush %d-%d :%v", dbID, tableID, err)
return nil
}
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/db/checkpoint/flusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func Test_RestartFlusher(t *testing.T) {
var ts types.TS

assert.Equal(t, ErrFlusherStopped, f.FlushTable(ctx, 0, 0, ts))
assert.Equal(t, ErrFlusherStopped, f.ForceFlush(ctx, ts, time.Millisecond))
assert.Equal(t, ErrFlusherStopped, f.ForceFlushWithInterval(ctx, ts, time.Millisecond, time.Millisecond))
assert.Equal(t, ErrFlusherStopped, f.ForceFlush(ctx, ts))
assert.Equal(t, ErrFlusherStopped, f.ForceFlushWithInterval(ctx, ts, time.Millisecond))
f.ChangeForceCheckInterval(time.Millisecond)
f.ChangeForceFlushTimeout(time.Millisecond)
f.Start()
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/db/checkpoint/gckp_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (executor *checkpointExecutor) onGCKPEntries(items ...any) {
return
}

if mergedCtx.interval == 0 {
mergedCtx.interval = executor.cfg.GlobalHistoryDuration
if mergedCtx.histroyRetention == 0 {
mergedCtx.histroyRetention = executor.cfg.GlobalHistoryDuration
}

fromEntry := executor.runner.store.MaxGlobalCheckpoint()
Expand Down
34 changes: 34 additions & 0 deletions pkg/vm/engine/tae/db/checkpoint/ickp_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -231,3 +234,34 @@ func (executor *checkpointExecutor) RunICKP() (err error) {
func (executor *checkpointExecutor) onICKPEntries(items ...any) {
executor.RunICKP()
}

func (executor *checkpointExecutor) doIncrementalCheckpoint(
entry *CheckpointEntry,
) (fields []zap.Field, files []string, err error) {
factory := logtail.IncrementalCheckpointDataFactory(
executor.runner.rt.SID(), entry.start, entry.end, true,
)
data, err := factory(executor.runner.catalog)
if err != nil {
return
}
fields = data.ExportStats("")
defer data.Close()
var cnLocation, tnLocation objectio.Location
cnLocation, tnLocation, files, err = data.WriteTo(
executor.ctx,
executor.cfg.BlockMaxRowsHint,
executor.cfg.SizeHint,
executor.runner.rt.Fs.Service,
)
if err != nil {
return
}
files = append(files, cnLocation.Name().String())
entry.SetLocation(cnLocation, tnLocation)

perfcounter.Update(executor.ctx, func(counter *perfcounter.CounterSet) {
counter.TAE.CheckPoint.DoIncrementalCheckpoint.Add(1)
})
return
}
5 changes: 0 additions & 5 deletions pkg/vm/engine/tae/db/checkpoint/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type RunnerReader interface {
GetAllIncrementalCheckpoints() []*CheckpointEntry
GetAllGlobalCheckpoints() []*CheckpointEntry
GetPenddingIncrementalCount() int
GetGlobalCheckpointCount() int
CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error)
ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry
GetLowWaterMark() types.TS
Expand Down Expand Up @@ -153,10 +152,6 @@ func (r *runner) GetPenddingIncrementalCount() int {
return r.store.GetPenddingIncrementalCount()
}

func (r *runner) GetGlobalCheckpointCount() int {
return r.store.GetGlobalCheckpointCount()
}

func (r *runner) GetAllCheckpoints() []*CheckpointEntry {
return r.store.GetAllCheckpoints()
}
Expand Down
Loading

0 comments on commit 63d4557

Please sign in to comment.