From a0b186196d913172d8006a10dc5d357c9b91e7f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Taylor?= Date: Mon, 14 Oct 2024 13:36:43 +0200 Subject: [PATCH] backport 16384 to release-18 (#16948) Signed-off-by: Andres Taylor Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> --- .../queries/aggregation/aggregation_test.go | 8 +- .../planbuilder/operators/SQL_builder.go | 7 ++ .../operators/horizon_expanding.go | 78 ++++++++++++++- .../planbuilder/operators/queryprojection.go | 5 +- .../planbuilder/testdata/aggr_cases.json | 94 ++++++++++++++++++- 5 files changed, 183 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index 39afb422fb4..9c8c875c0b4 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -390,9 +390,15 @@ func TestOrderByCount(t *testing.T) { defer closer() mcmp.Exec("insert into t9(id1, id2, id3) values(1, '1', '1'), (2, '2', '2'), (3, '2', '2'), (4, '3', '3'), (5, '3', '3'), (6, '3', '3')") + mcmp.Exec("insert into t1(t1_id, `name`, `value`, shardkey) values(1,'a1','foo',100), (2,'b1','foo',200), (3,'c1','foo',300), (4,'a1','foo',100), (5,'b1','bar',200)") mcmp.Exec("SELECT t9.id2 FROM t9 GROUP BY t9.id2 ORDER BY COUNT(t9.id2) DESC") - mcmp.Exec("select COUNT(*) from (select 1 as one FROM t9 WHERE id3 = 3 ORDER BY id1 DESC LIMIT 3 OFFSET 0) subquery_for_count") + version, err := cluster.GetMajorVersion("vtgate") + require.NoError(t, err) + if version == 18 { + mcmp.Exec("select COUNT(*) from (select 1 as one FROM t9 WHERE id3 = 3 ORDER BY id1 DESC LIMIT 3 OFFSET 0) subquery_for_count") + mcmp.Exec("select t.id1, t1.name, t.leCount from (select id1, count(*) as leCount from t9 group by 1 order by 2 desc limit 20) t join t1 on t.id1 = t1.t1_id") + } } func TestAggregateAnyValue(t *testing.T) { diff --git a/go/vt/vtgate/planbuilder/operators/SQL_builder.go b/go/vt/vtgate/planbuilder/operators/SQL_builder.go index d8e301b86ef..9a55ddef1de 100644 --- a/go/vt/vtgate/planbuilder/operators/SQL_builder.go +++ b/go/vt/vtgate/planbuilder/operators/SQL_builder.go @@ -464,6 +464,13 @@ func buildAggregation(op *Aggregator, qb *queryBuilder) error { qb.addGroupBy(weightStringFor(simplified)) } } + if op.DT != nil { + sel := qb.asSelectStatement() + qb.stmt = nil + qb.addTableExpr(op.DT.Alias, op.DT.Alias, TableID(op), &sqlparser.DerivedTable{ + Select: sel, + }, nil, op.DT.Columns) + } return nil } diff --git a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go index f55da7578bd..a2f0799c547 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go @@ -88,7 +88,7 @@ func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel // if we are dealing with a derived table, we need to make sure that the ordering columns // are available outside the derived table for _, order := range horizon.Query.GetOrderBy() { - qp.addColumn(ctx, order.Expr) + qp.addDerivedColumn(ctx, order.Expr) } } @@ -121,9 +121,9 @@ func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel } if len(qp.OrderExprs) > 0 { - op = &Ordering{ - Source: op, - Order: qp.OrderExprs, + op, err = expandOrderBy(ctx, op, qp, horizon.Alias) + if err != nil { + return nil, nil, err } extracted = append(extracted, "Ordering") } @@ -139,6 +139,76 @@ func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel return op, rewrite.NewTree(fmt.Sprintf("expand SELECT horizon into (%s)", strings.Join(extracted, ", ")), op), nil } +func expandOrderBy(ctx *plancontext.PlanningContext, op ops.Operator, qp *QueryProjection, derived string) (ops.Operator, error) { + var newOrder []ops.OrderBy + sqc := &SubQueryBuilder{} + proj, ok := op.(*Projection) + + for _, expr := range qp.OrderExprs { + // Attempt to extract any subqueries within the expression + newExpr, subqs, err := sqc.pullOutValueSubqueries(ctx, expr.SimplifiedExpr, TableID(op), false) + if err != nil { + return nil, err + } + if newExpr == nil { + // If no subqueries are found, retain the original order expression + if derived != "" { + expr = exposeOrderingColumn(ctx, qp, expr, derived) + } + newOrder = append(newOrder, expr) + continue + } + + // If the operator is not a projection, we cannot handle subqueries with aggregation + if !ok { + return nil, vterrors.VT12001("subquery with aggregation in order by") + } + + // Add the new subquery expression to the projection + if err := proj.addSubqueryExpr(aeWrap(newExpr), newExpr, subqs...); err != nil { + return nil, err + } + // Replace the original order expression with the new expression containing subqueries + newOrder = append(newOrder, ops.OrderBy{ + Inner: &sqlparser.Order{ + Expr: newExpr, + Direction: expr.Inner.Direction, + }, + SimplifiedExpr: newExpr, + }) + } + + // Update the source of the projection if we have it + if proj != nil { + proj.Source = sqc.getRootOperator(proj.Source) + } + + // Return the updated operator with the new order by expressions + return &Ordering{ + Source: op, + Order: newOrder, + }, nil +} + +// exposeOrderingColumn will expose the ordering column to the outer query +func exposeOrderingColumn(ctx *plancontext.PlanningContext, qp *QueryProjection, orderBy ops.OrderBy, derived string) ops.OrderBy { + for _, se := range qp.SelectExprs { + aliasedExpr, err := se.GetAliasedExpr() + if err != nil { + panic(vterrors.VT13001("unexpected expression in select")) + } + if ctx.SemTable.EqualsExprWithDeps(aliasedExpr.Expr, orderBy.SimplifiedExpr) { + newExpr := sqlparser.NewColNameWithQualifier(aliasedExpr.ColumnName(), sqlparser.NewTableName(derived)) + ctx.SemTable.CopySemanticInfo(orderBy.SimplifiedExpr, newExpr) + orderBy.SimplifiedExpr = newExpr + orderBy.Inner = &sqlparser.Order{Expr: newExpr, Direction: orderBy.Inner.Direction} + break + } + } + + return orderBy +} + func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horizon) (out ops.Operator, err error) { qp, err := horizon.getQP(ctx) if err != nil { diff --git a/go/vt/vtgate/planbuilder/operators/queryprojection.go b/go/vt/vtgate/planbuilder/operators/queryprojection.go index bf7401ad68d..39e76a5c1db 100644 --- a/go/vt/vtgate/planbuilder/operators/queryprojection.go +++ b/go/vt/vtgate/planbuilder/operators/queryprojection.go @@ -807,8 +807,9 @@ func (qp *QueryProjection) useGroupingOverDistinct(ctx *plancontext.PlanningCont return true, nil } -// addColumn adds a column to the QueryProjection if it is not already present -func (qp *QueryProjection) addColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { +// addColumn adds a column to the QueryProjection if it is not already present. +// It will use a column name that is available on the outside of the derived table +func (qp *QueryProjection) addDerivedColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { for _, selectExpr := range qp.SelectExprs { getExpr, err := selectExpr.GetExpr() if err != nil { diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index eb2e4042a94..127dd2c4837 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -716,6 +716,58 @@ ] } }, + { + "comment": "Aggregation with derived table", + "query": "select u.id, u.name, t.num_segments from (select id, count(*) as num_segments from user group by 1 order by 2 desc limit 20) t join unsharded u on u.id = t.id", + "plan": { + "QueryType": "SELECT", + "Original": "select u.id, u.name, t.num_segments from (select id, count(*) as num_segments from user group by 1 order by 2 desc limit 20) t join unsharded u on u.id = t.id", + "Instructions": { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "R:0,R:1,L:0", + "JoinVars": { + "t_id": 1 + }, + "TableName": "`user`_unsharded", + "Inputs": [ + { + "OperatorType": "Limit", + "Count": "INT64(20)", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select t.num_segments, t.id from (select id, count(*) as num_segments from `user` where 1 != 1 group by id) as t where 1 != 1", + "OrderBy": "0 DESC", + "Query": "select t.num_segments, t.id from (select id, count(*) as num_segments from `user` group by id) as t order by t.num_segments desc limit :__upper_limit", + "Table": "`user`" + } + ] + }, + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "FieldQuery": "select u.id, u.`name` from unsharded as u where 1 != 1", + "Query": "select u.id, u.`name` from unsharded as u where u.id = :t_id", + "Table": "unsharded" + } + ] + }, + "TablesUsed": [ + "main.unsharded", + "user.user" + ] + } + }, { "comment": "scatter aggregate multiple group by (numbers)", "query": "select a, b, count(*) from user group by 2, 1", @@ -3540,7 +3592,7 @@ }, "FieldQuery": "select id, val1, 1, weight_string(val1) from (select id, val1 from `user` where 1 != 1) as x where 1 != 1", "OrderBy": "(1|3) ASC", - "Query": "select id, val1, 1, weight_string(val1) from (select id, val1 from `user` where val2 < 4) as x order by `user`.val1 asc limit :__upper_limit", + "Query": "select id, val1, 1, weight_string(val1) from (select id, val1 from `user` where val2 < 4) as x order by x.val1 asc limit :__upper_limit", "Table": "`user`" } ] @@ -6439,7 +6491,7 @@ }, "FieldQuery": "select 1, id, weight_string(id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1", "OrderBy": "(1|2) DESC", - "Query": "select 1, id, weight_string(id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by id desc limit :__upper_limit", + "Query": "select 1, id, weight_string(id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by subquery_for_count.id desc limit :__upper_limit", "Table": "`user`" } ] @@ -6455,5 +6507,43 @@ "comment": "baz in the HAVING clause can't be accessed because of the GROUP BY", "query": "select foo, count(bar) as x from user group by foo having baz > avg(baz) order by x", "plan": "Unknown column 'baz' in 'having clause'" + }, + { + "comment": "Aggregation over a ORDER BY/LIMIT inside a derived table", + "query": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count", + "plan": { + "QueryType": "SELECT", + "Original": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "count_star(0) AS count(*)", + "ResultColumns": 1, + "Inputs": [ + { + "OperatorType": "Limit", + "Count": "INT64(25)", + "Offset": "INT64(0)", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select 1, id, weight_string(id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1", + "OrderBy": "(1|2) DESC", + "Query": "select 1, id, weight_string(id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by subquery_for_count.id desc limit :__upper_limit", + "Table": "`user`" + } + ] + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } } ]