-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support unparsing plans after applying optimize_projections
rule
#13267
Conversation
90b33e1
to
3b3f58e
Compare
|
||
/// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. | ||
/// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution. | ||
pub optimize_projections_preserve_existing_projections: bool, default = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might it be better to make this flag more generic, for example, just preserve_existing_projections
or prefer_existing_plan_nodes
, so it can be reused in the future in similar cases
// Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists. | ||
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection | ||
// information included in the TableScan node. | ||
if !already_projected { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This prevents from generating queries like SELECT a, b from (SELECT a, b from my_table)
.
@@ -882,6 +882,7 @@ fn test_table_scan_pushdown() -> Result<()> { | |||
let query_from_table_scan_with_projection = LogicalPlanBuilder::from( | |||
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?, | |||
) | |||
.project(vec![col("id"), col("age")])? | |||
.project(vec![wildcard()])? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this actually be a real plan with a wildcard projection and a TableScan that includes only two columns? I would expect them to match. If this is a real use case I will improve logic above (check for parent projection is a wildcard or does not match). Running all TPC-H and TPC-DS queries I've not found query where it was the case.
https://github.com/apache/datafusion/pull/13267/files#r1830100022
3b3f58e
to
32da325
Compare
3ae32bf
to
808b99f
Compare
808b99f
to
37b5245
Compare
Would this be equivalent to disabling |
@findepi - the logic of explain SELECT ss_item_sk, trip_distance_mi, total_amount
FROM store_sales
JOIN taxi_trips
ON store_sales.ss_quantity = taxi_trips.passenger_count
WHERE taxi_trips.total_amount=3.0 and ss_item_sk= 180
ORDER BY trip_distance_mi DESC LIMIT 10; Without | logical_plan | Sort: taxi_trips.trip_distance_mi DESC NULLS FIRST, fetch=10 |
| | Projection: store_sales.ss_item_sk, taxi_trips.trip_distance_mi, taxi_trips.total_amount |
| | Inner Join: CAST(store_sales.ss_quantity AS Int64) = taxi_trips.passenger_count |
| | Projection: store_sales.ss_item_sk, store_sales.ss_quantity |
| | BytesProcessedNode |
| | Federated |
| | Projection: store_sales.ss_sold_date_sk, store_sales.ss_sold_time_sk, store_sales.ss_item_sk, store_sales.ss_customer_sk, store_sales.ss_cdemo_sk, store_sales.ss_hdemo_sk, store_sales.ss_addr_sk, store_sales.ss_store_sk, store_sales.ss_promo_sk, store_sales.ss_ticket_number, store_sales.ss_quantity, store_sales.ss_wholesale_cost, store_sales.ss_list_price, store_sales.ss_sales_price, store_sales.ss_ext_discount_amt, store_sales.ss_ext_sales_price, store_sales.ss_ext_wholesale_cost, store_sales.ss_ext_list_price, store_sales.ss_ext_tax, store_sales.ss_coupon_amt, store_sales.ss_net_paid, store_sales.ss_net_paid_inc_tax, store_sales.ss_net_profit |
| | Filter: CAST(store_sales.ss_item_sk AS Int64) = Int64(180) |
| | TableScan: store_sales, partial_filters=[CAST(store_sales.ss_item_sk AS Int64) = Int64(180)] |
| | Projection: taxi_trips.passenger_count, taxi_trips.trip_distance_mi, taxi_trips.total_amount |
| | BytesProcessedNode |
| | Federated |
| | Projection: taxi_trips.pickup_datetime, taxi_trips.passenger_count, taxi_trips.trip_distance_mi, taxi_trips.fare_amount, taxi_trips.tip_amount, taxi_trips.total_amount |
| | TableScan: taxi_trips, full_filters=[taxi_trips.total_amount = Float64(3)] | With | logical_plan | Sort: taxi_trips.trip_distance_mi DESC NULLS FIRST, fetch=10 |
| | Projection: store_sales.ss_item_sk, taxi_trips.trip_distance_mi, taxi_trips.total_amount |
| | Inner Join: CAST(store_sales.ss_quantity AS Int64) = taxi_trips.passenger_count |
| | BytesProcessedNode |
| | Federated |
| | Projection: store_sales.ss_item_sk, store_sales.ss_quantity |
| | Filter: CAST(store_sales.ss_item_sk AS Int64) = Int64(180) |
| | TableScan: store_sales projection=[ss_item_sk, ss_quantity], partial_filters=[CAST(store_sales.ss_item_sk AS Int64) = Int64(180)] |
| | BytesProcessedNode |
| | Federated |
| | Projection: taxi_trips.passenger_count, taxi_trips.trip_distance_mi, taxi_trips.total_amount |
| | TableScan: taxi_trips projection=[passenger_count, trip_distance_mi, total_amount], full_filters=[taxi_trips.total_amount = Float64(3)] | Before
After:
|
I'll review this PR tonight. |
This comment was marked as outdated.
This comment was marked as outdated.
It seems confusing to disable only parts of an optimization rule for unparsing. I agree with @findepi that it seems more logical to disable the entire rule rather than just some parts of it.
What is the problem with the output? While there is an extra layer of projection it also doesn't seem like it would impact performance (it would likely be flattened by SQL optimizers) Is the issue that it is hard to read for a human? If so, could you potentially write some sort of "projection pullup" pass that pulls up the parts you are interested in pulling up / removing? |
@alamb - thank you for the feedback, the main challenge is that optimized plan can't be converted back to SQL after optimization (it is both added/moved and also removed The end goal is to prune unnecessary columns in cases where the plan needs to be split into sub-plans and executed separately via converting to SQL. In this scenario, pushing projections/columns down ensures that only the data required for final execution is fetched. Keeping the layout simple and human-readable is beneficial but a secondary priority. Would implementing a custom own optimizer using |
@sgrebnov is there a different example that could help understand the problem being solved? the current PR description indicates the unparsed SQL being more complex and more nested that it otherwise could be, but why is this actually a problem? When splitting and routing queries, readability is nice to have, but not sure if it can be expected. Or there some other problem behind there? |
@findepi - sorry, my bad – I should have provided more context We use Datafusion with DataFusion Federation to convert user queries into a LogicalPlan, then detect which parts of the plan belong to external execution engines. These parts are converted to SQL (unparsed) and executed by remote execution engines as part of the overall query execution. For example, in the scenario below, parts of the LogicalPlan are executed using external engines (MySQL and PostgreSQL) via unparsing corresponding sub-plans, with final aggregation/join processed by DataFusion. All of this happens as part of DataFusion’s execution logic. If there are multiple external engines involved, only parts of the plan are converted (see example below), so when we don’t have optimized/pushed-down projections, we end up fetching all columns. With projections optimization we propagate required columns to child nodes so only required columns could be fetched. Thus, the goal is to have projection columns pruning optimization enabled and to be able to unparse the logical plan back to SQL afterward. Please let me know if I should elaborate more on the challenges with the unparser after the optimization rule is applied.
|
I enabled Push-down doesn’t work, but the query still succeeds (execution behavior may vary):
Query failures:
It appears the physical planner struggles with additional projections. I believe As @alamb mentioned, having a config that changes part of an optimization can be confusing. Here are some potential solutions:
|
makes sense! can you reiterate or exemplify for the benefit of my understanding what's the problem if you run all of |
@findepi - there are 3 main challenges with unparsing optimized plan w/o this improvement 1. Column references require additional processing to project correctly let mut config = ConfigOptions::new();
config
.optimizer
.optimize_projections_preserve_existing_projections = false;
let state = SessionConfig::from(config);
let ctx = SessionContext::new_with_config(state);
let optimizer = OptimizeProjections::new();
ctx.sql("CREATE TABLE test (a int, b int, c int)").await?;
let sql = "SELECT a, sum(b) as sum_qty, count(*) as count_order FROM test where c > 5 group by a";
let df = ctx.sql(&sql).await?;
let original_plan = df.logical_plan().clone();
println!("original_plan\n:{}", original_plan.display_indent());
println!("sql: {}", plan_to_sql(&original_plan)?.to_string());
let optimized_plan = optimizer.rewrite(original_plan, &ctx.state()).data()?;
println!("optimized_plan\n: {}", optimized_plan.display_indent());
println!("sql: {}", plan_to_sql(&optimized_plan)?.to_string()); Will produce the following original_plan:
Projection: test.a, sum(test.b) AS sum_qty, count(*) AS count_order
Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b), count(*)]]
Filter: test.c > Int64(5)
TableScan: test
sql: SELECT test.a, sum(test.b) AS sum_qty, count(*) AS count_order FROM test WHERE (test.c > 5) GROUP BY test.a
optimized_plan:
Projection: test.a, sum(test.b) AS sum_qty, count(*) AS count_order
Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b), count(*)]]
Projection: test.a, test.b
Filter: test.c > Int64(5)
TableScan: test projection=[a, b, c]
sql: SELECT test.a, sum(test.b) AS sum_qty, count(*) AS count_order FROM (SELECT test.a, test.b FROM test WHERE (test.c > 5)) GROUP BY test.a As new projection added ( The following is invalid
Should be re-written as
This affects not just SELECT part but also other query components (grouping, ordering, join, window/aggregation funcitons/etc). This could be fixed by using additional post processing (collecting allowed table names and aliases and updating columns referring to tables/aliases that are not in scope anymore). There are some tricky cases (union, joins where all allowed references are not yet collected). 2. Unprojecting aggregation and window expressions Existing logic (few different cases) relies on Projections to identify and correctly unproject columns based on aggregations. With the optimization, Projection nodes could be removed and the overall identification approach needs improvements to work correct with removed Projections 3. Helps identify SQL body (SELECT) in general Current unparsing logic is based on Projection to identify where to start a subquery (derive) - this requires improvements to work correct with removed Projections. @goldmedal - does the "Push-down doesn’t work" part mean that projections are not pushed correctly (does not actually work) or something else? Could you please add more clarity on this. Going to test with The main question is: do you think this work is very specific, meaning the improvement should be added directly to the project, or do you believe other developers could benefit from it, so we should continue exploring the best way to integrate/add this to the DataFusion? |
For example, the test in datafusion/datafusion/sqllogictest/test_files/joins.slt Lines 1418 to 1422 in 4e1f839
After enabling optimize_projections_preserve_existing_projections , the projection for HashJoinExec will be removed.
@@ -1425,10 +1433,9 @@ logical_plan
01)Projection: count(alias1) AS count(DISTINCT join_t1.t1_id)
02)--Aggregate: groupBy=[[]], aggr=[[count(alias1)]]
03)----Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
-04)------Projection: join_t1.t1_id
-05)--------Inner Join: join_t1.t1_id = join_t2.t2_id
-06)----------TableScan: join_t1 projection=[t1_id]
-07)----------TableScan: join_t2 projection=[t2_id]
+04)------Inner Join: join_t1.t1_id = join_t2.t2_id
+05)--------TableScan: join_t1 projection=[t1_id]
+06)--------TableScan: join_t2 projection=[t2_id]
physical_plan
01)ProjectionExec: expr=[count(alias1)@0 as count(DISTINCT join_t1.t1_id)]
02)--AggregateExec: mode=Final, gby=[], aggr=[count(alias1)]
@@ -1436,7 +1443,7 @@ physical_plan
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(alias1)]
05)--------AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as alias1], aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)], projection=[t1_id@0]
+07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)]
08)--------------CoalesceBatchesExec: target_batch_size=2
09)----------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
10)------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 In this case, the pushdown doesn't work, but there are other similar cases. If you set this config to true by default, you can use the completed mode of the sqllogictests to find the cases in whichs the physical plan has changed.
Ideally, we should optimize the logical plans while retaining completeness, enabling us to unparse them back to SQL text easily—for example, by preserving However, achieving this would be a huge epic, with many issues needing resolution before enabling it by default 🤔. One compromise is what I mentioned previously in #13267 (comment). We could introduce a configuration (or simply rename |
@goldmedal - thank you for the detailed response. I'll review failing tests to get better understanding of why they are failing and if something can be improved within next few days and reply back. I like the idea of having this as a switch for unparser-friendly mode. As I've mentioned before there is similar case with predicates push down optimizer (but which could be controlled on the client side / for which nodes optimization is applied) and I plan to investigate this more deeply with the target goal and priority of improving unparsing instead of adding something to the optimizer (but this still could be the case). |
That would impose a limit on future optimizations on logical plans, which isn't desirable.
Can unparsing do this additional processing as needed for unparsing? Generally, is unparsing expected to produce a query plan that is very similar to original query parsed by DF? |
This comment was marked as outdated.
This comment was marked as outdated.
I didn’t have the chance to fully read through the discussion but IIUC, logical plans should always aim to provide the most optimal plan, independent of physical plans and their optimizations. We shouldn’t rely on physical plan behavior to ensure a truly modular engine. A user might reasonably expect the most optimal logical plan to be produced after logical optimizations and might proceed with custom physical rules that lack the expected capability. |
Thanks @berkaysynnada |
In other words, unparsing feature should not impose any constraints on logical plans produced by DF optimizer. We should have a solid definition what is and what isn't a valid logical plan (and we should do a few changes in that area, #12723), but other than that unparsing should not impose any limitations on what logical plans DF creates. |
Marking as draft as I think this PR is no longer waiting on feedback. Please mark it as ready for review when it is ready for another look I am trying to work down the review queue |
@alamb - makes sense, thank you. I'm going to close this PR as I don't think it makes sense to add this to DF, instead I'll create separate PR with just unparser improvement and will use custom optimizer rule. /cc @findepi, @goldmedal |
Which issue does this PR close?
The
optimize_projections
optimization is very useful as it pushes down projections to theTableScan
and ensures only required columns are fetched. This is useful when used alongside unparsing scenarios for plans involving multiple data sources, as the plan must be optimized to push down projections and fetch only the required columns. The downside of this process is that the rule modifies the original plan in a way that makes it difficult to unparse, and the resultant plan is not always optimal or efficient for unparsing use cases, for examplehttps://gist.github.com/sgrebnov/5071d2834e812b62bfdf434cf7e7e54c
Original query (TPC-DS Q72)
Plan and query after applying
optimize_projections
rule. Notice the additional projections added after joins.Rationale for this change
To support unparsing plans after
optimize_projections
is applied, it is proposed to add theoptimize_projections_preserve_existing_projections
configuration option to prevent the optimization logic from creating or removing projections and to preserve the original structure. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution.Are these changes tested?
Added test for
optimize_projections_preserve_existing_projections
configuration option. Unparsing have been tested by running all TPC-H and TPC-DS queries withoptimization_projections
enabled.Are there any user-facing changes?
Yes, a new
optimize_projections_preserve_existing_projections
configuration option has been introduced, which can be specified viaSessionConfig
or at a lower level usingOptimizerContext::new_with_options
.Or
There are no changes in default behavior.