From f767c186da1c3f9852b50b72abc83b78cc681c73 Mon Sep 17 00:00:00 2001 From: nitao Date: Thu, 17 Oct 2024 05:37:28 +0800 Subject: [PATCH] tp query can run in single goroutine when there's no merge operator (#19375) tp query can run in single goroutine when there's no merge operator Approved by: @ouyuanning --- pkg/sql/compile/compile.go | 19 ++++++++++++++++++- pkg/sql/compile/scope.go | 14 +++++++++++++- pkg/sql/compile/types.go | 2 ++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index c0aff3b2fb939..3cbda2ed2f74c 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -234,6 +234,7 @@ func (c *Compile) clear() { c.needLockMeta = false c.isInternal = false c.isPrepare = false + c.hasMergeOp = false c.needBlock = false for _, exe := range c.filterExprExes { @@ -449,9 +450,10 @@ func (c *Compile) prePipelineInitializer() (err error) { // run once func (c *Compile) runOnce() (err error) { + //c.printPipeline() var wg sync.WaitGroup - if c.execType == plan2.ExecTypeTP && len(c.scopes) == 1 { + if c.IsTpQuery() && len(c.scopes) == 1 { if err = c.run(c.scopes[0]); err != nil { return err } @@ -1323,6 +1325,7 @@ func (c *Compile) constructLoadMergeScope() *Scope { ds.Proc = c.proc.NewNoContextChildProc(1) ds.Proc.Base.LoadTag = true arg := merge.NewArgument() + c.hasMergeOp = true arg.SetAnalyzeControl(c.anal.curNodeIdx, false) ds.setRootOperator(arg) @@ -2152,6 +2155,7 @@ func (c *Compile) compileTpMinusAndIntersect(left []*Scope, right []*Scope, node merge0 := rs[0].RootOp.(*merge.Merge) merge0.WithPartial(0, 1) merge1 := merge.NewArgument().WithPartial(1, 2) + c.hasMergeOp = true currentFirstFlag := c.anal.isFirst switch nodeType { @@ -2182,6 +2186,7 @@ func (c *Compile) compileMinusAndIntersect(n *plan.Node, left []*Scope, right [] rs := c.newScopeListOnCurrentCN(2, int(n.Stats.BlockNum)) rs = c.newScopeListForMinusAndIntersect(rs, left, right, n) + c.hasMergeOp = true currentFirstFlag := c.anal.isFirst switch nodeType { case plan.Node_MINUS: @@ -2258,6 +2263,7 @@ func (c *Compile) compileShuffleJoin(node, left, right *plan.Node, lefts, rights shuffleJoins := c.newShuffleJoinScopeList(lefts, rights, node) + c.hasMergeOp = true for i := range shuffleJoins { mergeOp := merge.NewArgument() mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false) @@ -2581,6 +2587,7 @@ func (c *Compile) compileBuildSideForBoradcastJoin(node *plan.Node, rs, buildSco bs.Proc.Reg.MergeReceivers = append(bs.Proc.Reg.MergeReceivers, w) mergeOp := merge.NewArgument() + c.hasMergeOp = true mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false) bs.setRootOperator(mergeOp) bs.setRootOperator(constructJoinBuildOperator(c, tmp[0].RootOp, int32(len(tmp)))) @@ -2603,6 +2610,7 @@ func (c *Compile) compileBuildSideForBoradcastJoin(node *plan.Node, rs, buildSco bs.Proc.Reg.MergeReceivers = append(bs.Proc.Reg.MergeReceivers, w) mergeOp := merge.NewArgument() + c.hasMergeOp = true mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false) bs.setRootOperator(mergeOp) bs.setRootOperator(constructJoinBuildOperator(c, rs[i].RootOp, int32(rs[i].NodeInfo.Mcpu))) @@ -2900,6 +2908,7 @@ func (c *Compile) compileFuzzyFilter(n *plan.Node, ns []*plan.Node, left []*Scop merge1 := rs.RootOp.(*merge.Merge) merge1.WithPartial(0, 1) merge2 := merge.NewArgument().WithPartial(1, 2) + c.hasMergeOp = true currentFirstFlag := c.anal.isFirst op := constructFuzzyFilter(n, ns[n.Children[0]], ns[n.Children[1]]) @@ -3152,6 +3161,7 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* } parallelSize := c.getParallelSizeForExternalScan(n, ncpu) scopes := make([]*Scope, 0, parallelSize) + c.hasMergeOp = true for i := 0; i < parallelSize; i++ { s := c.newEmptyMergeScope() mergeArg := merge.NewArgument() @@ -3347,6 +3357,7 @@ func (c *Compile) compileRecursiveCte(n *plan.Node, curNodeIdx int32) ([]*Scope, mergecteArg.SetAnalyzeControl(c.anal.curNodeIdx, false) mergecteArg.AppendChild(mergeOp2) c.anal.isFirst = false + c.hasMergeOp = true return []*Scope{rs}, nil } @@ -3364,6 +3375,7 @@ func (c *Compile) compileRecursiveScan(n *plan.Node, curNodeIdx int32) ([]*Scope rs.Proc.Reg.MergeReceivers = receivers mergeOp := merge.NewArgument() + c.hasMergeOp = true rs.setRootOperator(mergeOp) currentFirstFlag := c.anal.isFirst mergeRecursiveArg := mergerecursive.NewArgument() @@ -3386,6 +3398,7 @@ func (c *Compile) compileSinkScanNode(n *plan.Node, curNodeIdx int32) ([]*Scope, currentFirstFlag := c.anal.isFirst mergeArg := merge.NewArgument().WithSinkScan(true) + c.hasMergeOp = true mergeArg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) rs.setRootOperator(mergeArg) c.anal.isFirst = false @@ -3451,6 +3464,7 @@ func (c *Compile) newDeleteMergeScope(arg *deletion.Deletion, ss []*Scope, n *pl mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false) rs[i].setRootOperator(mergeOp) } + c.hasMergeOp = true for i := 0; i < len(ss); i++ { dispatchArg := constructDispatch(i, rs, ss[i], n, false) @@ -3489,6 +3503,7 @@ func (c *Compile) newMergeScope(ss []*Scope) *Scope { // waring: `Merge` operator` is not used as an input/output analyze, // and `Merge` operator cannot play the role of IsFirst/IsLast mergeOp := merge.NewArgument() + c.hasMergeOp = true mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false) rs.setRootOperator(mergeOp) @@ -3532,6 +3547,7 @@ func (c *Compile) newMergeScopeByCN(ss []*Scope, nodeinfo engine.Node) *Scope { // waring: `Merge` operator` is not used as an input/output analyze, // and `Merge` operator cannot play the role of IsFirst/IsLast mergeOp := merge.NewArgument() + c.hasMergeOp = true mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false) rs.setRootOperator(mergeOp) for i := range ss { @@ -3563,6 +3579,7 @@ func (c *Compile) newScopeListWithNode(mcpu, childrenCount int, addr string) []* mergeOp.SetAnalyzeControl(c.anal.curNodeIdx, false) ss[i].setRootOperator(mergeOp) } + c.hasMergeOp = true //c.anal.isFirst = false return ss } diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index fb9ba093a8bf9..bd642eb4e6cf7 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -17,11 +17,12 @@ package compile import ( "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" "strings" "sync" "time" + "github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/cnservice/cnclient" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -230,6 +231,17 @@ func (s *Scope) SetOperatorInfoRecursively(cb func() int32) { // MergeRun range and run the scope's pre-scopes by go-routine, and finally run itself to do merge work. func (s *Scope) MergeRun(c *Compile) error { + if c.IsTpQuery() && !c.hasMergeOp { + lenPrescopes := len(s.PreScopes) + for i := lenPrescopes - 1; i >= 0; i-- { + err := s.PreScopes[i].MergeRun(c) + if err != nil { + return err + } + } + return s.ParallelRun(c) + } + var wg sync.WaitGroup preScopeResultReceiveChan := make(chan error, len(s.PreScopes)) for i := range s.PreScopes { diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index f8cf9c6e646bd..aeaeee8abeab0 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -308,6 +308,8 @@ type Compile struct { filterExprExes []colexec.ExpressionExecutor isPrepare bool + + hasMergeOp bool } type RemoteReceivRegInfo struct {