Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unparsing plans after applying optimize_projections rule #13267

Closed
wants to merge 4 commits into from

Conversation

sgrebnov
Copy link
Member

@sgrebnov sgrebnov commented Nov 5, 2024

Which issue does this PR close?

The optimize_projections optimization is very useful as it pushes down projections to the TableScan and ensures only required columns are fetched. This is useful when used alongside unparsing scenarios for plans involving multiple data sources, as the plan must be optimized to push down projections and fetch only the required columns. The downside of this process is that the rule modifies the original plan in a way that makes it difficult to unparse, and the resultant plan is not always optimal or efficient for unparsing use cases, for example

https://gist.github.com/sgrebnov/5071d2834e812b62bfdf434cf7e7e54c

Original query (TPC-DS Q72)

select  i_item_desc
      ,w_warehouse_name
      ,d1.d_week_seq
      ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
      ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
      ,count(*) total_cnt
from catalog_sales
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join item on (i_item_sk = cs_item_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
  and inv_quantity_on_hand < cs_quantity
  and d3.d_date > d1.d_date + INTERVAL '5 days'
  and hd_buy_potential = '501-1000'
  and d1.d_year = 1999
  and cd_marital_status = 'S'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
 LIMIT 100;

Plan and query after applying optimize_projections rule. Notice the additional projections added after joins.

image
select
	"i_item_desc",
	"w_warehouse_name",
	"d_week_seq",
	sum(case when "p_promo_sk" is null then 1 else 0 end) as "no_promo",
	sum(case when "p_promo_sk" is not null then 1 else 0 end) as "promo",
	count(1) as "total_cnt"
from
	(
	select
		"w_warehouse_name",
		"i_item_desc",
		"d_week_seq",
		"p_promo_sk"
	from
		(
		select
			"cs_item_sk",
			"cs_order_number",
			"w_warehouse_name",
			"i_item_desc",
			"d_week_seq",
			"promotion"."p_promo_sk"
		from
			(
			select
				"cs_item_sk",
				"cs_promo_sk",
				"cs_order_number",
				"w_warehouse_name",
				"i_item_desc",
				"d_week_seq"
			from
				(
				select
					"cs_ship_date_sk",
					"cs_item_sk",
...

Rationale for this change

To support unparsing plans after optimize_projections is applied, it is proposed to add the optimize_projections_preserve_existing_projections configuration option to prevent the optimization logic from creating or removing projections and to preserve the original structure. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution.

Are these changes tested?

Added test for optimize_projections_preserve_existing_projections configuration option. Unparsing have been tested by running all TPC-H and TPC-DS queries with optimization_projections enabled.

Are there any user-facing changes?

Yes, a new optimize_projections_preserve_existing_projections configuration option has been introduced, which can be specified via SessionConfig or at a lower level using OptimizerContext::new_with_options.

SessionStateBuilder::new()
  .with_config(
      SessionConfig::new().with_optimize_projections_preserve_existing_projections(true),
  )
  .build();

Or

let mut config = ConfigOptions::new();
config
    .optimizer
    .optimize_projections_preserve_existing_projections =
    preserve_projections;
let optimizer_context = OptimizerContext::new_with_options(config);

There are no changes in default behavior.

@github-actions github-actions bot added sql SQL Planner optimizer Optimizer rules common Related to common crate execution Related to the execution crate labels Nov 5, 2024
@sgrebnov sgrebnov force-pushed the sgrebnov/projections branch from 90b33e1 to 3b3f58e Compare November 5, 2024 22:38

/// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections.
/// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution.
pub optimize_projections_preserve_existing_projections: bool, default = false
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might it be better to make this flag more generic, for example, just preserve_existing_projections or prefer_existing_plan_nodes, so it can be reused in the future in similar cases

// Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
// information included in the TableScan node.
if !already_projected {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prevents from generating queries like SELECT a, b from (SELECT a, b from my_table).

@@ -882,6 +882,7 @@ fn test_table_scan_pushdown() -> Result<()> {
let query_from_table_scan_with_projection = LogicalPlanBuilder::from(
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?,
)
.project(vec![col("id"), col("age")])?
.project(vec![wildcard()])?
Copy link
Member Author

@sgrebnov sgrebnov Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this actually be a real plan with a wildcard projection and a TableScan that includes only two columns? I would expect them to match. If this is a real use case I will improve logic above (check for parent projection is a wildcard or does not match). Running all TPC-H and TPC-DS queries I've not found query where it was the case.
https://github.com/apache/datafusion/pull/13267/files#r1830100022

@sgrebnov sgrebnov force-pushed the sgrebnov/projections branch from 3b3f58e to 32da325 Compare November 5, 2024 22:57
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Nov 6, 2024
@sgrebnov sgrebnov force-pushed the sgrebnov/projections branch from 3ae32bf to 808b99f Compare November 6, 2024 00:30
@sgrebnov sgrebnov force-pushed the sgrebnov/projections branch from 808b99f to 37b5245 Compare November 6, 2024 00:56
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Nov 6, 2024
@findepi
Copy link
Member

findepi commented Nov 6, 2024

Would this be equivalent to disabling optimize_projections?

@sgrebnov
Copy link
Member Author

sgrebnov commented Nov 6, 2024

Would this be equivalent to disabling optimize_projections?

@findepi - the logic of optimize_projections is still working in this case pushing down only required columns, for example

explain SELECT ss_item_sk, trip_distance_mi, total_amount
FROM store_sales
JOIN taxi_trips
ON store_sales.ss_quantity = taxi_trips.passenger_count
WHERE taxi_trips.total_amount=3.0 and ss_item_sk= 180
ORDER BY trip_distance_mi DESC LIMIT 10;

Without optimize_projections

| logical_plan  | Sort: taxi_trips.trip_distance_mi DESC NULLS FIRST, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |   Projection: store_sales.ss_item_sk, taxi_trips.trip_distance_mi, taxi_trips.total_amount                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |     Inner Join: CAST(store_sales.ss_quantity AS Int64) = taxi_trips.passenger_count                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |       Projection: store_sales.ss_item_sk, store_sales.ss_quantity                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |         BytesProcessedNode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |           Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |  Projection: store_sales.ss_sold_date_sk, store_sales.ss_sold_time_sk, store_sales.ss_item_sk, store_sales.ss_customer_sk, store_sales.ss_cdemo_sk, store_sales.ss_hdemo_sk, store_sales.ss_addr_sk, store_sales.ss_store_sk, store_sales.ss_promo_sk, store_sales.ss_ticket_number, store_sales.ss_quantity, store_sales.ss_wholesale_cost, store_sales.ss_list_price, store_sales.ss_sales_price, store_sales.ss_ext_discount_amt, store_sales.ss_ext_sales_price, store_sales.ss_ext_wholesale_cost, store_sales.ss_ext_list_price, store_sales.ss_ext_tax, store_sales.ss_coupon_amt, store_sales.ss_net_paid, store_sales.ss_net_paid_inc_tax, store_sales.ss_net_profit                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |   Filter: CAST(store_sales.ss_item_sk AS Int64) = Int64(180)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |     TableScan: store_sales, partial_filters=[CAST(store_sales.ss_item_sk AS Int64) = Int64(180)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |       Projection: taxi_trips.passenger_count, taxi_trips.trip_distance_mi, taxi_trips.total_amount                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |         BytesProcessedNode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |           Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |  Projection: taxi_trips.pickup_datetime, taxi_trips.passenger_count, taxi_trips.trip_distance_mi, taxi_trips.fare_amount, taxi_trips.tip_amount, taxi_trips.total_amount                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |   TableScan: taxi_trips, full_filters=[taxi_trips.total_amount = Float64(3)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |

With optimize_projections and optimize_projections_preserve_existing_projections

| logical_plan  | Sort: taxi_trips.trip_distance_mi DESC NULLS FIRST, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |   Projection: store_sales.ss_item_sk, taxi_trips.trip_distance_mi, taxi_trips.total_amount                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |     Inner Join: CAST(store_sales.ss_quantity AS Int64) = taxi_trips.passenger_count                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |       BytesProcessedNode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |         Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |  Projection: store_sales.ss_item_sk, store_sales.ss_quantity                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |   Filter: CAST(store_sales.ss_item_sk AS Int64) = Int64(180)                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |     TableScan: store_sales projection=[ss_item_sk, ss_quantity], partial_filters=[CAST(store_sales.ss_item_sk AS Int64) = Int64(180)]                                                                                                                                                                                                                                                                                                                                                                            |
|               |       BytesProcessedNode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |         Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |  Projection: taxi_trips.passenger_count, taxi_trips.trip_distance_mi, taxi_trips.total_amount                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |   TableScan: taxi_trips projection=[passenger_count, trip_distance_mi, total_amount], full_filters=[taxi_trips.total_amount = Float64(3)]                                                                                                                                                                                                                                                                                                                                                                        |

Before

Projection: store_sales.ss_sold_date_sk, store_sales.ss_sold_time_sk, store_sales.ss_item_sk, store_sales.ss_customer_sk, store_sales.ss_cdemo_sk, store_sales.ss_hdemo_sk, store_sales.ss_addr_sk, store_sales.ss_store_sk, store_sales.ss_promo_sk, store_sales.ss_ticket_number, store_sales.ss_quantity, store_sales.ss_wholesale_cost, store_sales.ss_list_price, store_sales.ss_sales_price, store_sales.ss_ext_discount_amt, store_sales.ss_ext_sales_price, store_sales.ss_ext_wholesale_cost, store_sales.ss_ext_list_price, store_sales.ss_ext_tax, store_sales.ss_coupon_amt, store_sales.ss_net_paid, store_sales.ss_net_paid_inc_tax, store_sales.ss_net_profit                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|               |   Filter: CAST(store_sales.ss_item_sk AS Int64) = Int64(180)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |     TableScan: store_sales, partial_filters=[CAST(store_sales.ss_item_sk AS Int64) = Int64(180)]            

After:

|               |  Projection: store_sales.ss_item_sk, store_sales.ss_quantity                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |   Filter: CAST(store_sales.ss_item_sk AS Int64) = Int64(180)                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |     TableScan: store_sales projection=[ss_item_sk, ss_quantity], partial_filters=[CAST(store_sales.ss_item_sk AS Int64) = Int64(180)]   

@goldmedal
Copy link
Contributor

I'll review this PR tonight.

@goldmedal

This comment was marked as outdated.

@alamb
Copy link
Contributor

alamb commented Nov 8, 2024

It seems confusing to disable only parts of an optimization rule for unparsing. I agree with @findepi that it seems more logical to disable the entire rule rather than just some parts of it.

Notice the additional projections added after joins.

What is the problem with the output?

While there is an extra layer of projection it also doesn't seem like it would impact performance (it would likely be flattened by SQL optimizers)

Is the issue that it is hard to read for a human? If so, could you potentially write some sort of "projection pullup" pass that pulls up the parts you are interested in pulling up / removing?

@sgrebnov
Copy link
Member Author

sgrebnov commented Nov 8, 2024

@alamb - thank you for the feedback, the main challenge is that optimized plan can't be converted back to SQL after optimization (it is both added/moved and also removed LogicalPlan::Projection nodes). For example, the way aggregation functions are unprojected relies on certain assumptions about plan structure, etc (I actually started from this approach/unparser logic change but then realized that it adds a lot of additional complexity to existing unparsing logic).

The end goal is to prune unnecessary columns in cases where the plan needs to be split into sub-plans and executed separately via converting to SQL. In this scenario, pushing projections/columns down ensures that only the data required for final execution is fetched. Keeping the layout simple and human-readable is beneficial but a secondary priority.

Would implementing a custom own optimizer using optimize_projections as a reference be a recommended alternative approach in this case?

@findepi
Copy link
Member

findepi commented Nov 9, 2024

@sgrebnov is there a different example that could help understand the problem being solved?

the current PR description indicates the unparsed SQL being more complex and more nested that it otherwise could be, but why is this actually a problem? When splitting and routing queries, readability is nice to have, but not sure if it can be expected. Or there some other problem behind there?

@sgrebnov
Copy link
Member Author

sgrebnov commented Nov 9, 2024

@findepi - sorry, my bad – I should have provided more context

We use Datafusion with DataFusion Federation to convert user queries into a LogicalPlan, then detect which parts of the plan belong to external execution engines. These parts are converted to SQL (unparsed) and executed by remote execution engines as part of the overall query execution. For example, in the scenario below, parts of the LogicalPlan are executed using external engines (MySQL and PostgreSQL) via unparsing corresponding sub-plans, with final aggregation/join processed by DataFusion. All of this happens as part of DataFusion’s execution logic. If there are multiple external engines involved, only parts of the plan are converted (see example below), so when we don’t have optimized/pushed-down projections, we end up fetching all columns. With projections optimization we propagate required columns to child nodes so only required columns could be fetched. Thus, the goal is to have projection columns pruning optimization enabled and to be able to unparse the logical plan back to SQL afterward. Please let me know if I should elaborate more on the challenges with the unparser after the optimization rule is applied.

                  ┌────────────────────────┐
                  │   Join / Aggregation   │               B and C are
                  └────────────────────────┘               available in an
                               ▲                           external database
                               │                           DBMS-2 (PostreSQL)
                               │
A is available in an           │                           Unparse -> SQL
external database in           ├─────────────────────┐
DBMS-1 (MySQL).                │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
                  ┌────────────┘                     │
Unparse -> SQL    │             │                    │                      │
                  │                          ┌───────┴──────┐
                  │             │            │     Join     │               │
    ┌ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ┐              └───────▲──────┘
                  │             │                    │                      │
    │             │           │            ┌─────────┴──────────┐
         ┌────────┴───────┐     │          │                    │           │
    │    │     Scan A     │   │            │                    │
         └────────────────┘     │ ┌────────────────┐   ┌────────────────┐   │
    │                         │   │     Scan B     │   │     Scan C     │
                                │ └────────────────┘   └────────────────┘   │
    │                         │
     ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘

@goldmedal
Copy link
Contributor

I enabled optimize_projections_preserve_existing_projections to run the sqllogictests. I found that many projection push-downs do not work (the physical plan changes), and some queries fail.

Push-down doesn’t work, but the query still succeeds (execution behavior may vary):

  • Join
  • Union
  • Predicate
  • Sort
  • Window

Query failures:

  • same_column_name_cross_join.slt

It appears the physical planner struggles with additional projections. I believe optimize_projections_preserve_existing_projections is an unparser-specific config and shouldn't be used in query execution. However, I see the value in the additional projection for the unparser, as it helps identify the SQL body (SELECT) more easily.

As @alamb mentioned, having a config that changes part of an optimization can be confusing. Here are some potential solutions:

  • Instead of modifying the existing optimization, create a separate optimize_projection rule for unparsing, allowing users to optimize their plan for SQL generation.
  • Provide a config to switch optimize_projection into unparsing mode (though this might complicate the codebase).
  • Improve the unparser to handle the original optimized plan.

@findepi
Copy link
Member

findepi commented Nov 10, 2024

With projections optimization we propagate required columns to child nodes so only required columns could be fetched.

makes sense!
this is important, agreed

can you reiterate or exemplify for the benefit of my understanding what's the problem if you run all of optimize_projection?

@sgrebnov
Copy link
Member Author

sgrebnov commented Nov 13, 2024

@findepi - there are 3 main challenges with unparsing optimized plan w/o this improvement

1. Column references require additional processing to project correctly

let mut config = ConfigOptions::new();
config
    .optimizer
    .optimize_projections_preserve_existing_projections = false;
let state = SessionConfig::from(config);
let ctx = SessionContext::new_with_config(state);

let optimizer = OptimizeProjections::new();

ctx.sql("CREATE TABLE test (a int, b int, c int)").await?;
let sql = "SELECT a, sum(b) as sum_qty, count(*) as count_order FROM test where c > 5 group by a";

let df = ctx.sql(&sql).await?;

let original_plan = df.logical_plan().clone();
println!("original_plan\n:{}", original_plan.display_indent());
println!("sql: {}", plan_to_sql(&original_plan)?.to_string());

let optimized_plan = optimizer.rewrite(original_plan, &ctx.state()).data()?;
println!("optimized_plan\n: {}", optimized_plan.display_indent());
println!("sql: {}", plan_to_sql(&optimized_plan)?.to_string());

Will produce the following

original_plan:

Projection: test.a, sum(test.b) AS sum_qty, count(*) AS count_order
  Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b), count(*)]]
    Filter: test.c > Int64(5)
      TableScan: test

sql: SELECT test.a, sum(test.b) AS sum_qty, count(*) AS count_order FROM test WHERE (test.c > 5) GROUP BY test.a

optimized_plan:

Projection: test.a, sum(test.b) AS sum_qty, count(*) AS count_order
  Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b), count(*)]]
    Projection: test.a, test.b
      Filter: test.c > Int64(5)
        TableScan: test projection=[a, b, c]

sql: SELECT test.a, sum(test.b) AS sum_qty, count(*) AS count_order FROM (SELECT test.a, test.b FROM test WHERE (test.c > 5)) GROUP BY test.a

As new projection added (Projection: test.a, test.b) we now have additional subquery so we can't unparse the original projection columns as-is anymore:

The following is invalid

SELECT test.a, sum(test.b) AS sum_qty, count(*) AS count_order FROM (SELECT test.a, test.b FROM test WHERE (test.c > 5)) GROUP BY test.a

Should be re-written as

SELECT a, sum(b) AS sum_qty, count(*) AS count_order FROM (SELECT test.a, test.b FROM test WHERE (test.c > 5)) GROUP BY a

This affects not just SELECT part but also other query components (grouping, ordering, join, window/aggregation funcitons/etc). This could be fixed by using additional post processing (collecting allowed table names and aliases and updating columns referring to tables/aliases that are not in scope anymore). There are some tricky cases (union, joins where all allowed references are not yet collected).

2. Unprojecting aggregation and window expressions

Existing logic (few different cases) relies on Projections to identify and correctly unproject columns based on aggregations. With the optimization, Projection nodes could be removed and the overall identification approach needs improvements to work correct with removed Projections
https://github.com/apache/datafusion/blob/main/datafusion/sql/src/unparser/utils.rs#L37
https://github.com/apache/datafusion/blob/main/datafusion/sql/src/unparser/plan.rs#L186
https://github.com/apache/datafusion/blob/main/datafusion/sql/src/unparser/plan.rs#L329

3. Helps identify SQL body (SELECT) in general

Current unparsing logic is based on Projection to identify where to start a subquery (derive) - this requires improvements to work correct with removed Projections.

@goldmedal - does the "Push-down doesn’t work" part mean that projections are not pushed correctly (does not actually work) or something else? Could you please add more clarity on this. Going to test with sqllogictests as well - thank you for pointing to this.

The main question is: do you think this work is very specific, meaning the improvement should be added directly to the project, or do you believe other developers could benefit from it, so we should continue exploring the best way to integrate/add this to the DataFusion?

@goldmedal
Copy link
Contributor

@goldmedal - does the "Push-down doesn’t work" part mean that projections are not pushed correctly (does not actually work) or something else? Could you please add more clarity on this. Going to test with sqllogictests as well - thank you for pointing to this.

For example, the test in joins.slt.

query TT
EXPLAIN
select count(distinct join_t1.t1_id)
from join_t1
inner join join_t2 on join_t1.t1_id = join_t2.t2_id

After enabling optimize_projections_preserve_existing_projections, the projection for HashJoinExec will be removed.

@@ -1425,10 +1433,9 @@ logical_plan
 01)Projection: count(alias1) AS count(DISTINCT join_t1.t1_id)
 02)--Aggregate: groupBy=[[]], aggr=[[count(alias1)]]
 03)----Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
-04)------Projection: join_t1.t1_id
-05)--------Inner Join: join_t1.t1_id = join_t2.t2_id
-06)----------TableScan: join_t1 projection=[t1_id]
-07)----------TableScan: join_t2 projection=[t2_id]
+04)------Inner Join: join_t1.t1_id = join_t2.t2_id
+05)--------TableScan: join_t1 projection=[t1_id]
+06)--------TableScan: join_t2 projection=[t2_id]
 physical_plan
 01)ProjectionExec: expr=[count(alias1)@0 as count(DISTINCT join_t1.t1_id)]
 02)--AggregateExec: mode=Final, gby=[], aggr=[count(alias1)]
@@ -1436,7 +1443,7 @@ physical_plan
 04)------AggregateExec: mode=Partial, gby=[], aggr=[count(alias1)]
 05)--------AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as alias1], aggr=[]
 06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)], projection=[t1_id@0]
+07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)]
 08)--------------CoalesceBatchesExec: target_batch_size=2
 09)----------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
 10)------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1

In this case, the pushdown doesn't work, but there are other similar cases. If you set this config to true by default, you can use the completed mode of the sqllogictests to find the cases in whichs the physical plan has changed.

cargo test --test sqllogictests -- --complete

The main question is: do you think this work is very specific, meaning the improvement should be added directly to the project, or do you believe other developers could benefit from it, so we should continue exploring the best way to integrate/add this to the DataFusion?

Ideally, we should optimize the logical plans while retaining completeness, enabling us to unparse them back to SQL text easily—for example, by preserving Projection. Meanwhile, the physical planner can still generate an efficient physical plan with the necessary optimizations (e.g., Join with pushdown). If we can enable this configuration by default without altering any original physical plan, that would be the best-case scenario.

However, achieving this would be a huge epic, with many issues needing resolution before enabling it by default 🤔.

One compromise is what I mentioned previously in #13267 (comment). We could introduce a configuration (or simply rename optimize_projections_preserve_existing_projections) to switch the optimization to an unparse-friendly mode. Additionally, we should document that users should avoid executing SQL with this configuration enabled, as some SQL queries may fail in this mode.

https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/test_files/same_column_name_cross_join.slt

@sgrebnov
Copy link
Member Author

sgrebnov commented Nov 14, 2024

@goldmedal - thank you for the detailed response. I'll review failing tests to get better understanding of why they are failing and if something can be improved within next few days and reply back. I like the idea of having this as a switch for unparser-friendly mode. As I've mentioned before there is similar case with predicates push down optimizer (but which could be controlled on the client side / for which nodes optimization is applied) and I plan to investigate this more deeply with the target goal and priority of improving unparsing instead of adding something to the optimizer (but this still could be the case).

@findepi
Copy link
Member

findepi commented Nov 16, 2024

Meanwhile, the physical planner can still generate an efficient physical plan with the necessary optimizations (e.g., Join with pushdown). If we can enable this configuration by default without altering any original physical plan, that would be the best-case scenario.

That would impose a limit on future optimizations on logical plans, which isn't desirable.

  1. Column references require additional processing to project correctly

Can unparsing do this additional processing as needed for unparsing?

Generally, is unparsing expected to produce a query plan that is very similar to original query parsed by DF?
I hope that's a non goal, but if it is, it should really disable logical plan optimizations and maybe just do nothing more than projection pushdown.

@goldmedal

This comment was marked as outdated.

@berkaysynnada
Copy link
Contributor

I didn’t have the chance to fully read through the discussion but IIUC, logical plans should always aim to provide the most optimal plan, independent of physical plans and their optimizations. We shouldn’t rely on physical plan behavior to ensure a truly modular engine. A user might reasonably expect the most optimal logical plan to be produced after logical optimizations and might proceed with custom physical rules that lack the expected capability.

@goldmedal
Copy link
Contributor

..., logical plans should always aim to provide the most optimal plan, independent of physical plans and their optimizations.

Thanks @berkaysynnada
I see. If that's the main goal, I believe we should focus on continuously enhancing the unparsing behavior rather than modifying the optimization rule. This configuration should never be enabled by default. I suppose that's why @findepi mentioned it is desirable.

@findepi
Copy link
Member

findepi commented Nov 20, 2024

In other words, unparsing feature should not impose any constraints on logical plans produced by DF optimizer.

We should have a solid definition what is and what isn't a valid logical plan (and we should do a few changes in that area, #12723), but other than that unparsing should not impose any limitations on what logical plans DF creates.

@alamb alamb marked this pull request as draft November 23, 2024 13:19
@alamb
Copy link
Contributor

alamb commented Nov 23, 2024

Marking as draft as I think this PR is no longer waiting on feedback. Please mark it as ready for review when it is ready for another look

I am trying to work down the review queue

@sgrebnov
Copy link
Member Author

@alamb - makes sense, thank you. I'm going to close this PR as I don't think it makes sense to add this to DF, instead I'll create separate PR with just unparser improvement and will use custom optimizer rule.

/cc @findepi, @goldmedal

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate documentation Improvements or additions to documentation execution Related to the execution crate optimizer Optimizer rules sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants