Skip to content

Commit

Permalink
fix: dump by len
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <[email protected]>
  • Loading branch information
Rustin170506 committed Nov 24, 2024
1 parent 4172aa3 commit ae91973
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze"
"github.com/pingcap/tidb/pkg/statistics/handle/initstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/statistics/handle/usage"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/store"
"github.com/pingcap/tidb/pkg/store/helper"
Expand Down Expand Up @@ -2585,6 +2586,7 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
// We need to have different nodes trigger tasks at different times to avoid the herd effect.
randDuration := time.Duration(rand.Int63n(int64(time.Minute)))
deltaUpdateTicker := time.NewTicker(20*lease + randDuration)
tableDeltaLenTicker := time.NewTicker(500 * time.Millisecond)
gcStatsTicker := time.NewTicker(100 * lease)
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
updateStatsHealthyTicker := time.NewTicker(20 * lease)
Expand All @@ -2594,6 +2596,7 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
dumpColStatsUsageTicker.Stop()
gcStatsTicker.Stop()
deltaUpdateTicker.Stop()
tableDeltaLenTicker.Stop()
readMemTicker.Stop()
updateStatsHealthyTicker.Stop()
do.SetStatsUpdating(false)
Expand All @@ -2611,6 +2614,18 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
if err != nil {
logutil.BgLogger().Debug("dump stats delta failed", zap.Error(err))
}
case <-tableDeltaLenTicker.C:
tableDeltaLen := usage.GetGlobalTableCount()
if tableDeltaLen >= 1000 {
logutil.BgLogger().Info("table delta length", zap.Int32("length", tableDeltaLen))
start := time.Now()
err := statsHandle.DumpStatsDeltaToKV(false)
if err != nil {
logutil.BgLogger().Debug("dump stats delta failed", zap.Error(err))
}
logutil.BgLogger().Info("dump stats delta to kv", zap.Duration("take time", time.Since(start)))
usage.ResetGlobalTableCount()
}
case <-gcStatsTicker.C:
if !do.statsOwner.IsOwner() {
continue
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type StatsUsage interface {
// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
DumpStatsDeltaToKV(dumpAll bool) error

// TableDeltaLen returns the number of tables in the SessionTableDelta.
TableDeltaLen() int

// DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV.
DumpColStatsUsageToKV() error
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/statistics/handle/usage/session_stats_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand All @@ -46,6 +47,18 @@ var (
batchInsertSize = 10
)

var globalTableCount int32

// GetGlobalTableCount returns the global table count.
func GetGlobalTableCount() int32 {
return atomic.LoadInt32(&globalTableCount)
}

// ResetGlobalTableCount resets the global table count.
func ResetGlobalTableCount() {
atomic.StoreInt32(&globalTableCount, 0)
}

// needDumpStatsDelta checks whether to dump stats delta.
// 1. If the table doesn't exist or is a mem table or system table, then return false.
// 2. If the mode is DumpAll, then return true.
Expand Down Expand Up @@ -125,6 +138,11 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
})
}

// TableDeltaLen returns the number of tables in the SessionTableDelta.
func (s *statsUsageImpl) TableDeltaLen() int {
return s.SessionTableDelta().Len()
}

// dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version.
// For a partitioned table, we will update its global-stats as well.
func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableID int64, delta variable.TableDelta) (updated bool, err error) {
Expand Down Expand Up @@ -448,6 +466,9 @@ func (m *TableDelta) GetDeltaAndReset() map[int64]variable.TableDelta {
func (m *TableDelta) Update(id int64, delta int64, count int64, colSize *map[int64]int64) {
m.lock.Lock()
defer m.lock.Unlock()
if _, exists := m.delta[id]; !exists {
atomic.AddInt32(&globalTableCount, 1)
}
UpdateTableDeltaMap(m.delta, id, delta, count, colSize)
}

Expand All @@ -463,6 +484,13 @@ func (m *TableDelta) Merge(deltaMap map[int64]variable.TableDelta) {
}
}

// Len returns the number of tables in the TableDelta.
func (m *TableDelta) Len() int {
m.lock.Lock()
defer m.lock.Unlock()
return len(m.delta)
}

// UpdateTableDeltaMap updates the delta of the table.
func UpdateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) {
item := m[id]
Expand Down

0 comments on commit ae91973

Please sign in to comment.