Skip to content

Commit

Permalink
tp query can run in single goroutine when there's no merge operator (#…
Browse files Browse the repository at this point in the history
…19375)

tp query can run in single goroutine when there's no merge operator

Approved by: @ouyuanning
  • Loading branch information
badboynt1 authored Oct 16, 2024
1 parent 418d313 commit f767c18
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
19 changes: 18 additions & 1 deletion pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (c *Compile) clear() {
c.needLockMeta = false
c.isInternal = false
c.isPrepare = false
c.hasMergeOp = false
c.needBlock = false

for _, exe := range c.filterExprExes {
Expand Down Expand Up @@ -449,9 +450,10 @@ func (c *Compile) prePipelineInitializer() (err error) {

// run once
func (c *Compile) runOnce() (err error) {
//c.printPipeline()
var wg sync.WaitGroup

if c.execType == plan2.ExecTypeTP && len(c.scopes) == 1 {
if c.IsTpQuery() && len(c.scopes) == 1 {
if err = c.run(c.scopes[0]); err != nil {
return err
}
Expand Down Expand Up @@ -1323,6 +1325,7 @@ func (c *Compile) constructLoadMergeScope() *Scope {
ds.Proc = c.proc.NewNoContextChildProc(1)
ds.Proc.Base.LoadTag = true
arg := merge.NewArgument()
c.hasMergeOp = true
arg.SetAnalyzeControl(c.anal.curNodeIdx, false)

ds.setRootOperator(arg)
Expand Down Expand Up @@ -2152,6 +2155,7 @@ func (c *Compile) compileTpMinusAndIntersect(left []*Scope, right []*Scope, node
merge0 := rs[0].RootOp.(*merge.Merge)
merge0.WithPartial(0, 1)
merge1 := merge.NewArgument().WithPartial(1, 2)
c.hasMergeOp = true

currentFirstFlag := c.anal.isFirst
switch nodeType {
Expand Down Expand Up @@ -2182,6 +2186,7 @@ func (c *Compile) compileMinusAndIntersect(n *plan.Node, left []*Scope, right []
rs := c.newScopeListOnCurrentCN(2, int(n.Stats.BlockNum))
rs = c.newScopeListForMinusAndIntersect(rs, left, right, n)

c.hasMergeOp = true
currentFirstFlag := c.anal.isFirst
switch nodeType {
case plan.Node_MINUS:
Expand Down Expand Up @@ -2258,6 +2263,7 @@ func (c *Compile) compileShuffleJoin(node, left, right *plan.Node, lefts, rights

shuffleJoins := c.newShuffleJoinScopeList(lefts, rights, node)

c.hasMergeOp = true
for i := range shuffleJoins {
mergeOp := merge.NewArgument()
mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false)
Expand Down Expand Up @@ -2581,6 +2587,7 @@ func (c *Compile) compileBuildSideForBoradcastJoin(node *plan.Node, rs, buildSco
bs.Proc.Reg.MergeReceivers = append(bs.Proc.Reg.MergeReceivers, w)

mergeOp := merge.NewArgument()
c.hasMergeOp = true
mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false)
bs.setRootOperator(mergeOp)
bs.setRootOperator(constructJoinBuildOperator(c, tmp[0].RootOp, int32(len(tmp))))
Expand All @@ -2603,6 +2610,7 @@ func (c *Compile) compileBuildSideForBoradcastJoin(node *plan.Node, rs, buildSco
bs.Proc.Reg.MergeReceivers = append(bs.Proc.Reg.MergeReceivers, w)

mergeOp := merge.NewArgument()
c.hasMergeOp = true
mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false)
bs.setRootOperator(mergeOp)
bs.setRootOperator(constructJoinBuildOperator(c, rs[i].RootOp, int32(rs[i].NodeInfo.Mcpu)))
Expand Down Expand Up @@ -2900,6 +2908,7 @@ func (c *Compile) compileFuzzyFilter(n *plan.Node, ns []*plan.Node, left []*Scop
merge1 := rs.RootOp.(*merge.Merge)
merge1.WithPartial(0, 1)
merge2 := merge.NewArgument().WithPartial(1, 2)
c.hasMergeOp = true

currentFirstFlag := c.anal.isFirst
op := constructFuzzyFilter(n, ns[n.Children[0]], ns[n.Children[1]])
Expand Down Expand Up @@ -3152,6 +3161,7 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]*
}
parallelSize := c.getParallelSizeForExternalScan(n, ncpu)
scopes := make([]*Scope, 0, parallelSize)
c.hasMergeOp = true
for i := 0; i < parallelSize; i++ {
s := c.newEmptyMergeScope()
mergeArg := merge.NewArgument()
Expand Down Expand Up @@ -3347,6 +3357,7 @@ func (c *Compile) compileRecursiveCte(n *plan.Node, curNodeIdx int32) ([]*Scope,
mergecteArg.SetAnalyzeControl(c.anal.curNodeIdx, false)
mergecteArg.AppendChild(mergeOp2)
c.anal.isFirst = false
c.hasMergeOp = true

return []*Scope{rs}, nil
}
Expand All @@ -3364,6 +3375,7 @@ func (c *Compile) compileRecursiveScan(n *plan.Node, curNodeIdx int32) ([]*Scope
rs.Proc.Reg.MergeReceivers = receivers

mergeOp := merge.NewArgument()
c.hasMergeOp = true
rs.setRootOperator(mergeOp)
currentFirstFlag := c.anal.isFirst
mergeRecursiveArg := mergerecursive.NewArgument()
Expand All @@ -3386,6 +3398,7 @@ func (c *Compile) compileSinkScanNode(n *plan.Node, curNodeIdx int32) ([]*Scope,

currentFirstFlag := c.anal.isFirst
mergeArg := merge.NewArgument().WithSinkScan(true)
c.hasMergeOp = true
mergeArg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag)
rs.setRootOperator(mergeArg)
c.anal.isFirst = false
Expand Down Expand Up @@ -3451,6 +3464,7 @@ func (c *Compile) newDeleteMergeScope(arg *deletion.Deletion, ss []*Scope, n *pl
mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false)
rs[i].setRootOperator(mergeOp)
}
c.hasMergeOp = true

for i := 0; i < len(ss); i++ {
dispatchArg := constructDispatch(i, rs, ss[i], n, false)
Expand Down Expand Up @@ -3489,6 +3503,7 @@ func (c *Compile) newMergeScope(ss []*Scope) *Scope {
// waring: `Merge` operator` is not used as an input/output analyze,
// and `Merge` operator cannot play the role of IsFirst/IsLast
mergeOp := merge.NewArgument()
c.hasMergeOp = true
mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false)
rs.setRootOperator(mergeOp)

Expand Down Expand Up @@ -3532,6 +3547,7 @@ func (c *Compile) newMergeScopeByCN(ss []*Scope, nodeinfo engine.Node) *Scope {
// waring: `Merge` operator` is not used as an input/output analyze,
// and `Merge` operator cannot play the role of IsFirst/IsLast
mergeOp := merge.NewArgument()
c.hasMergeOp = true
mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false)
rs.setRootOperator(mergeOp)
for i := range ss {
Expand Down Expand Up @@ -3563,6 +3579,7 @@ func (c *Compile) newScopeListWithNode(mcpu, childrenCount int, addr string) []*
mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false)
ss[i].setRootOperator(mergeOp)
}
c.hasMergeOp = true
//c.anal.isFirst = false
return ss
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package compile
import (
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util"
"strings"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -230,6 +231,17 @@ func (s *Scope) SetOperatorInfoRecursively(cb func() int32) {

// MergeRun range and run the scope's pre-scopes by go-routine, and finally run itself to do merge work.
func (s *Scope) MergeRun(c *Compile) error {
if c.IsTpQuery() && !c.hasMergeOp {
lenPrescopes := len(s.PreScopes)
for i := lenPrescopes - 1; i >= 0; i-- {
err := s.PreScopes[i].MergeRun(c)
if err != nil {
return err
}
}
return s.ParallelRun(c)
}

var wg sync.WaitGroup
preScopeResultReceiveChan := make(chan error, len(s.PreScopes))
for i := range s.PreScopes {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/compile/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ type Compile struct {
filterExprExes []colexec.ExpressionExecutor

isPrepare bool

hasMergeOp bool
}

type RemoteReceivRegInfo struct {
Expand Down

0 comments on commit f767c18

Please sign in to comment.