diff --git a/pkg/sql/colexec/shuffleV2/shufflepool.go b/pkg/sql/colexec/shuffleV2/shufflepool.go index 4adcba97794df..dc2421c78f8bb 100644 --- a/pkg/sql/colexec/shuffleV2/shufflepool.go +++ b/pkg/sql/colexec/shuffleV2/shufflepool.go @@ -35,6 +35,8 @@ type shufflePoolStats struct { //for debug } func (sp *ShufflePoolV2) printStats() { + sp.statsLock.Lock() + defer sp.statsLock.Unlock() maxCNT := sp.stats.inputCNT[0] minCNT := sp.stats.inputCNT[0] for i := range sp.stats.inputCNT { @@ -60,6 +62,7 @@ type ShufflePoolV2 struct { stoppers int32 batches []*batch.Batch holderLock sync.Mutex + statsLock sync.Mutex batchLocks []sync.Mutex endingWaiters []chan bool batchWaiters []chan bool @@ -173,7 +176,9 @@ func (sp *ShufflePoolV2) getEndingBatch(buf *batch.Batch, shuffleIDX int32, proc bat = sp.batches[shuffleIDX] sp.batches[shuffleIDX] = nil if bat != nil { + sp.statsLock.Lock() sp.stats.outputCNT[shuffleIDX] += int64(bat.RowCount()) + sp.statsLock.Unlock() } return bat case <-proc.Ctx.Done(): @@ -199,7 +204,9 @@ func (sp *ShufflePoolV2) getFullBatch(buf *batch.Batch, shuffleIDX int32) *batch buf.ShuffleIDX = bat.ShuffleIDX } sp.batches[shuffleIDX] = buf + sp.statsLock.Lock() sp.stats.outputCNT[shuffleIDX] += int64(bat.RowCount()) + sp.statsLock.Unlock() return bat } @@ -225,10 +232,12 @@ func (sp *ShufflePoolV2) putAllBatchIntoPoolByShuffleIdx(srcBatch *batch.Batch, if err != nil { return err } + sp.statsLock.Lock() if sp.batches[shuffleIDX].RowCount() > sp.stats.maxBatchCNT { sp.stats.maxBatchCNT = sp.batches[shuffleIDX].RowCount() } sp.stats.inputCNT[shuffleIDX] += int64(srcBatch.RowCount()) + sp.statsLock.Unlock() if sp.batches[shuffleIDX].RowCount() >= colexec.DefaultBatchSize && len(sp.batchWaiters[shuffleIDX]) == 0 { sp.batchWaiters[shuffleIDX] <- true } @@ -256,11 +265,13 @@ func (sp *ShufflePoolV2) putBatchIntoShuffledPoolsBySels(srcBatch *batch.Batch, return err } } + sp.statsLock.Lock() sp.stats.inputCNT[i] += int64(len(currentSels)) - bat.AddRowCount(len(currentSels)) if bat.RowCount() > sp.stats.maxBatchCNT { sp.stats.maxBatchCNT = bat.RowCount() } + sp.statsLock.Unlock() + bat.AddRowCount(len(currentSels)) if bat.RowCount() >= colexec.DefaultBatchSize && len(sp.batchWaiters[i]) == 0 { sp.batchWaiters[i] <- true } @@ -273,7 +284,9 @@ func (sp *ShufflePoolV2) putBatchIntoShuffledPoolsBySels(srcBatch *batch.Batch, func (sp *ShufflePoolV2) statsDirectlySentBatch(srcBatch *batch.Batch) { rows := int64(srcBatch.RowCount()) + sp.statsLock.Lock() sp.stats.inputCNT[srcBatch.ShuffleIDX] += rows sp.stats.outputCNT[srcBatch.ShuffleIDX] += rows sp.stats.directRows += rows + sp.statsLock.Unlock() }