Skip to content

Commit

Permalink
checker: use ttl cache (tikv#8505)
Browse files Browse the repository at this point in the history
close tikv#8500

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Aug 8, 2024
1 parent ccf8d66 commit ff71512
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *Service) AskBatchSplit(_ context.Context, request *schedulingpb.AskBatc
// If region splits during the scheduling process, regions with abnormal
// status may be left, and these regions need to be checked with higher
// priority.
c.GetCoordinator().GetCheckerController().AddPendingProcessedRegions(recordRegions...)
c.GetCoordinator().GetCheckerController().AddPendingProcessedRegions(false, recordRegions...)

return &schedulingpb.AskBatchSplitResponse{
Header: s.header(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func (mc *Cluster) SetStoreLabel(storeID uint64, labels map[string]string) {
}

// AddPendingProcessedRegions mock method
func (mc *Cluster) AddPendingProcessedRegions(ids ...uint64) {
func (mc *Cluster) AddPendingProcessedRegions(_ bool, ids ...uint64) {
for _, id := range ids {
mc.pendingProcessedRegions[id] = struct{}{}
}
Expand Down
19 changes: 9 additions & 10 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Controller struct {
mergeChecker *MergeChecker
jointStateChecker *JointStateChecker
priorityInspector *PriorityInspector
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix

// duration is the duration of the last patrol round.
Expand All @@ -88,7 +88,7 @@ type Controller struct {

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
pendingProcessedRegions := cache.NewDefaultCache(DefaultPendingRegionCacheSize)
pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute)
return &Controller{
ctx: ctx,
cluster: cluster,
Expand Down Expand Up @@ -311,7 +311,7 @@ func (c *Controller) tryAddOperators(region *core.RegionInfo) {
c.opController.AddWaitingOperator(ops...)
c.RemovePendingProcessedRegion(id)
} else {
c.AddPendingProcessedRegions(id)
c.AddPendingProcessedRegions(true, id)
}
}

Expand All @@ -327,16 +327,15 @@ func (c *Controller) GetRuleChecker() *RuleChecker {

// GetPendingProcessedRegions returns the pending processed regions in the cache.
func (c *Controller) GetPendingProcessedRegions() []uint64 {
pendingRegions := make([]uint64, 0)
for _, item := range c.pendingProcessedRegions.Elems() {
pendingRegions = append(pendingRegions, item.Key)
}
return pendingRegions
return c.pendingProcessedRegions.GetAllID()
}

// AddPendingProcessedRegions adds the pending processed region into the cache.
func (c *Controller) AddPendingProcessedRegions(ids ...uint64) {
func (c *Controller) AddPendingProcessedRegions(needCheckLen bool, ids ...uint64) {
for _, id := range ids {
if needCheckLen && c.pendingProcessedRegions.Len() > DefaultPendingRegionCacheSize {
return
}
c.pendingProcessedRegions.Put(id, nil)
}
}
Expand Down Expand Up @@ -385,7 +384,7 @@ func (c *Controller) CheckSuspectRanges() {
if lastRegion.GetEndKey() != nil && bytes.Compare(lastRegion.GetEndKey(), keyRange[1]) < 0 {
c.AddSuspectKeyRange(lastRegion.GetEndKey(), keyRange[1])
}
c.AddPendingProcessedRegions(regionIDList...)
c.AddPendingProcessedRegions(false, regionIDList...)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ type ReplicaChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions *cache.TTLUint64) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
20 changes: 10 additions & 10 deletions pkg/schedule/checker/replica_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (suite *replicaCheckerTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster = mockcluster.NewCluster(suite.ctx, cfg)
suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewDefaultCache(10))
suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
stats := &pdpb.StoreStats{
Capacity: 100,
Available: 100,
Expand Down Expand Up @@ -213,7 +213,7 @@ func (suite *replicaCheckerTestSuite) TestBasic() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetMaxSnapshotCount(2)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

// Add stores 1,2,3,4.
tc.AddRegionStore(1, 4)
Expand Down Expand Up @@ -290,7 +290,7 @@ func (suite *replicaCheckerTestSuite) TestLostStore() {
tc.AddRegionStore(1, 1)
tc.AddRegionStore(2, 1)

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

// now region peer in store 1,2,3.but we just have store 1,2
// This happens only in recovering the PD tc
Expand All @@ -309,7 +309,7 @@ func (suite *replicaCheckerTestSuite) TestOffline() {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "rack", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(3, 3, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -361,7 +361,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore() {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "rack", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 9, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 8, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -441,7 +441,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore2() {
tc.SetMaxReplicas(5)
tc.SetLocationLabels([]string{"zone", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"})
Expand Down Expand Up @@ -470,7 +470,7 @@ func (suite *replicaCheckerTestSuite) TestStorageThreshold() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetLocationLabels([]string{"zone"})
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.UpdateStorageRatio(1, 0.5, 0.5)
Expand Down Expand Up @@ -506,7 +506,7 @@ func (suite *replicaCheckerTestSuite) TestOpts() {
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddRegionStore(1, 100)
tc.AddRegionStore(2, 100)
Expand Down Expand Up @@ -539,7 +539,7 @@ func (suite *replicaCheckerTestSuite) TestFixDownPeer() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetLocationLabels([]string{"zone"})
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
Expand Down Expand Up @@ -571,7 +571,7 @@ func (suite *replicaCheckerTestSuite) TestFixOfflinePeer() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetLocationLabels([]string{"zone"})
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ type RuleChecker struct {
PauseController
cluster sche.CheckerCluster
ruleManager *placement.RuleManager
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
}

// NewRuleChecker creates a checker instance.
func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions cache.Cache) *RuleChecker {
func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions *cache.TTLUint64) *RuleChecker {
return &RuleChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (suite *ruleCheckerTestSuite) SetupTest() {
suite.cluster.SetEnableWitness(true)
suite.cluster.SetEnableUseJointConsensus(false)
suite.ruleManager = suite.cluster.RuleManager
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10))
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
}

func (suite *ruleCheckerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -1955,7 +1955,7 @@ func (suite *ruleCheckerTestAdvancedSuite) SetupTest() {
suite.cluster.SetEnableWitness(true)
suite.cluster.SetEnableUseJointConsensus(true)
suite.ruleManager = suite.cluster.RuleManager
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10))
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
}

func (suite *ruleCheckerTestAdvancedSuite) TearDownTest() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func (h *Handler) AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey string
for _, region := range regions {
regionsIDList = append(regionsIDList, region.GetID())
}
co.GetCheckerController().AddPendingProcessedRegions(regionsIDList...)
co.GetCheckerController().AddPendingProcessedRegions(false, regionsIDList...)
}
return nil
}
Expand All @@ -1151,7 +1151,7 @@ func (h *Handler) AccelerateRegionsScheduleInRanges(startKeys [][]byte, endKeys
for _, region := range regions {
regionsIDList = append(regionsIDList, region.GetID())
}
co.GetCheckerController().AddPendingProcessedRegions(regionsIDList...)
co.GetCheckerController().AddPendingProcessedRegions(false, regionsIDList...)
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ type RegionScatterer struct {
ordinaryEngine engineContext
specialEngines sync.Map
opController *operator.Controller
addSuspectRegions func(regionIDs ...uint64)
addSuspectRegions func(bool, ...uint64)
}

// NewRegionScatterer creates a region scatterer.
// RegionScatter is used for the `Lightning`, it will scatter the specified regions before import data.
func NewRegionScatterer(ctx context.Context, cluster sche.SharedCluster, opController *operator.Controller, addSuspectRegions func(regionIDs ...uint64)) *RegionScatterer {
func NewRegionScatterer(ctx context.Context, cluster sche.SharedCluster, opController *operator.Controller, addSuspectRegions func(bool, ...uint64)) *RegionScatterer {
return &RegionScatterer{
ctx: ctx,
name: regionScatterName,
Expand Down Expand Up @@ -275,7 +275,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa
// in a group level instead of cluster level.
func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string, skipStoreLimit bool) (*operator.Operator, error) {
if !filter.IsRegionReplicated(r.cluster, region) {
r.addSuspectRegions(region.GetID())
r.addSuspectRegions(false, region.GetID())
scatterSkipNotReplicatedCounter.Inc()
log.Warn("region not replicated during scatter", zap.Uint64("region-id", region.GetID()))
return nil, errors.Errorf("region %d is not fully replicated", region.GetID())
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/splitter/region_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func NewSplitRegionsHandler(cluster sche.ClusterInformer, oc *operator.Controlle
type RegionSplitter struct {
cluster sche.ClusterInformer
handler SplitRegionsHandler
addSuspectRegions func(ids ...uint64)
addSuspectRegions func(bool, ...uint64)
}

// NewRegionSplitter return a region splitter
func NewRegionSplitter(cluster sche.ClusterInformer, handler SplitRegionsHandler, addSuspectRegions func(ids ...uint64)) *RegionSplitter {
func NewRegionSplitter(cluster sche.ClusterInformer, handler SplitRegionsHandler, addSuspectRegions func(bool, ...uint64)) *RegionSplitter {
return &RegionSplitter{
cluster: cluster,
handler: handler,
Expand Down Expand Up @@ -173,7 +173,7 @@ func (r *RegionSplitter) groupKeysByRegion(keys [][]byte) map[uint64]*regionGrou

func (r *RegionSplitter) checkRegionValid(region *core.RegionInfo) bool {
if !filter.IsRegionReplicated(r.cluster, region) {
r.addSuspectRegions(region.GetID())
r.addSuspectRegions(false, region.GetID())
return false
}
if region.GetLeader() == nil {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*
// If region splits during the scheduling process, regions with abnormal
// status may be left, and these regions need to be checked with higher
// priority.
c.AddPendingProcessedRegions(recordRegions...)
c.AddPendingProcessedRegions(false, recordRegions...)

resp := &pdpb.AskBatchSplitResponse{Ids: splitIDs}

Expand Down
4 changes: 2 additions & 2 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ func (sc *schedulingController) PauseOrResumeChecker(name string, t int64) error
}

// AddPendingProcessedRegions adds regions to suspect list.
func (sc *schedulingController) AddPendingProcessedRegions(regionIDs ...uint64) {
func (sc *schedulingController) AddPendingProcessedRegions(needCheckLen bool, regionIDs ...uint64) {
sc.mu.RLock()
defer sc.mu.RUnlock()
sc.coordinator.GetCheckerController().AddPendingProcessedRegions(regionIDs...)
sc.coordinator.GetCheckerController().AddPendingProcessedRegions(needCheckLen, regionIDs...)
}

// GetPendingProcessedRegions gets all suspect regions.
Expand Down

0 comments on commit ff71512

Please sign in to comment.