Skip to content

Commit

Permalink
fix data race in shuffle pool stats (#20641)
Browse files Browse the repository at this point in the history
fix data race in shuffle pool stats

Approved by: @sukki37
  • Loading branch information
badboynt1 authored Dec 8, 2024
1 parent ce59328 commit 9c6ab9e
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion pkg/sql/colexec/shuffleV2/shufflepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type shufflePoolStats struct { //for debug
}

func (sp *ShufflePoolV2) printStats() {
sp.statsLock.Lock()
defer sp.statsLock.Unlock()
maxCNT := sp.stats.inputCNT[0]
minCNT := sp.stats.inputCNT[0]
for i := range sp.stats.inputCNT {
Expand All @@ -60,6 +62,7 @@ type ShufflePoolV2 struct {
stoppers int32
batches []*batch.Batch
holderLock sync.Mutex
statsLock sync.Mutex
batchLocks []sync.Mutex
endingWaiters []chan bool
batchWaiters []chan bool
Expand Down Expand Up @@ -173,7 +176,9 @@ func (sp *ShufflePoolV2) getEndingBatch(buf *batch.Batch, shuffleIDX int32, proc
bat = sp.batches[shuffleIDX]
sp.batches[shuffleIDX] = nil
if bat != nil {
sp.statsLock.Lock()
sp.stats.outputCNT[shuffleIDX] += int64(bat.RowCount())
sp.statsLock.Unlock()
}
return bat
case <-proc.Ctx.Done():
Expand All @@ -199,7 +204,9 @@ func (sp *ShufflePoolV2) getFullBatch(buf *batch.Batch, shuffleIDX int32) *batch
buf.ShuffleIDX = bat.ShuffleIDX
}
sp.batches[shuffleIDX] = buf
sp.statsLock.Lock()
sp.stats.outputCNT[shuffleIDX] += int64(bat.RowCount())
sp.statsLock.Unlock()
return bat
}

Expand All @@ -225,10 +232,12 @@ func (sp *ShufflePoolV2) putAllBatchIntoPoolByShuffleIdx(srcBatch *batch.Batch,
if err != nil {
return err
}
sp.statsLock.Lock()
if sp.batches[shuffleIDX].RowCount() > sp.stats.maxBatchCNT {
sp.stats.maxBatchCNT = sp.batches[shuffleIDX].RowCount()
}
sp.stats.inputCNT[shuffleIDX] += int64(srcBatch.RowCount())
sp.statsLock.Unlock()
if sp.batches[shuffleIDX].RowCount() >= colexec.DefaultBatchSize && len(sp.batchWaiters[shuffleIDX]) == 0 {
sp.batchWaiters[shuffleIDX] <- true
}
Expand Down Expand Up @@ -256,11 +265,13 @@ func (sp *ShufflePoolV2) putBatchIntoShuffledPoolsBySels(srcBatch *batch.Batch,
return err
}
}
sp.statsLock.Lock()
sp.stats.inputCNT[i] += int64(len(currentSels))
bat.AddRowCount(len(currentSels))
if bat.RowCount() > sp.stats.maxBatchCNT {
sp.stats.maxBatchCNT = bat.RowCount()
}
sp.statsLock.Unlock()
bat.AddRowCount(len(currentSels))
if bat.RowCount() >= colexec.DefaultBatchSize && len(sp.batchWaiters[i]) == 0 {
sp.batchWaiters[i] <- true
}
Expand All @@ -273,7 +284,9 @@ func (sp *ShufflePoolV2) putBatchIntoShuffledPoolsBySels(srcBatch *batch.Batch,

func (sp *ShufflePoolV2) statsDirectlySentBatch(srcBatch *batch.Batch) {
rows := int64(srcBatch.RowCount())
sp.statsLock.Lock()
sp.stats.inputCNT[srcBatch.ShuffleIDX] += rows
sp.stats.outputCNT[srcBatch.ShuffleIDX] += rows
sp.stats.directRows += rows
sp.statsLock.Unlock()
}

0 comments on commit 9c6ab9e

Please sign in to comment.