Skip to content

Commit

Permalink
fix: concurrently dump
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <[email protected]>
  • Loading branch information
Rustin170506 committed Nov 23, 2024
1 parent 500ddb0 commit 39ca1d0
Showing 1 changed file with 43 additions and 15 deletions.
58 changes: 43 additions & 15 deletions pkg/statistics/handle/usage/session_stats_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,57 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
return utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
currentTime := time.Now()

var wg sync.WaitGroup
errCh := make(chan error, len(deltaMap))
semaphore := make(chan struct{}, 8) // Limit concurrent goroutines

for id, item := range deltaMap {
if !s.needDumpStatsDelta(is, dumpAll, id, item, currentTime) {
continue
}
updated, err := s.dumpTableStatCountToKV(is, id, item)

wg.Add(1)
semaphore <- struct{}{} // Acquire
go func(id int64, item variable.TableDelta) {
defer func() {
<-semaphore // Release
wg.Done()
}()

updated, err := s.dumpTableStatCountToKV(is, id, item)
if err != nil {
errCh <- err
return
}
if updated {
UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil)
}
if err = storage.DumpTableStatColSizeToKV(sctx, id, item); err != nil {
delete(deltaMap, id)
errCh <- err
return
}
if updated {
delete(deltaMap, id)
} else {
m := deltaMap[id]
m.ColSize = nil
deltaMap[id] = m
}
}(id, item)
}

wg.Wait()
close(errCh)

// Return first error if any occurred
for err := range errCh {
if err != nil {
return errors.Trace(err)
}
if updated {
UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil)
}
if err = storage.DumpTableStatColSizeToKV(sctx, id, item); err != nil {
delete(deltaMap, id)
return errors.Trace(err)
}
if updated {
delete(deltaMap, id)
} else {
m := deltaMap[id]
m.ColSize = nil
deltaMap[id] = m
}
}

return nil
})
}
Expand Down

0 comments on commit 39ca1d0

Please sign in to comment.