diff --git a/pkg/statistics/handle/usage/session_stats_collect.go b/pkg/statistics/handle/usage/session_stats_collect.go index 22c4a192a9132..5b7293c21f2e7 100644 --- a/pkg/statistics/handle/usage/session_stats_collect.go +++ b/pkg/statistics/handle/usage/session_stats_collect.go @@ -98,29 +98,57 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { return utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) currentTime := time.Now() + + var wg sync.WaitGroup + errCh := make(chan error, len(deltaMap)) + semaphore := make(chan struct{}, 8) // Limit concurrent goroutines + for id, item := range deltaMap { if !s.needDumpStatsDelta(is, dumpAll, id, item, currentTime) { continue } - updated, err := s.dumpTableStatCountToKV(is, id, item) + + wg.Add(1) + semaphore <- struct{}{} // Acquire + go func(id int64, item variable.TableDelta) { + defer func() { + <-semaphore // Release + wg.Done() + }() + + updated, err := s.dumpTableStatCountToKV(is, id, item) + if err != nil { + errCh <- err + return + } + if updated { + UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil) + } + if err = storage.DumpTableStatColSizeToKV(sctx, id, item); err != nil { + delete(deltaMap, id) + errCh <- err + return + } + if updated { + delete(deltaMap, id) + } else { + m := deltaMap[id] + m.ColSize = nil + deltaMap[id] = m + } + }(id, item) + } + + wg.Wait() + close(errCh) + + // Return first error if any occurred + for err := range errCh { if err != nil { return errors.Trace(err) } - if updated { - UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil) - } - if err = storage.DumpTableStatColSizeToKV(sctx, id, item); err != nil { - delete(deltaMap, id) - return errors.Trace(err) - } - if updated { - delete(deltaMap, id) - } else { - m := deltaMap[id] - m.ColSize = nil - deltaMap[id] = m - } } + return nil }) }