Skip to content

Commit

Permalink
Improve unparsing after optimize_projections optimization (#13599)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgrebnov authored Dec 4, 2024
1 parent 1aadce0 commit a62033e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
60 changes: 36 additions & 24 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,11 @@ impl Unparser<'_> {
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
if let Some(unparsed_table_scan) =
Self::unparse_table_scan_pushdown(plan, None)?
{
if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
plan,
None,
select.already_projected(),
)? {
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
Expand Down Expand Up @@ -585,6 +587,7 @@ impl Unparser<'_> {
let unparsed_table_scan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
select.already_projected(),
)?;
// if the child plan is a TableScan with pushdown operations, we don't need to
// create an additional subquery for it
Expand Down Expand Up @@ -714,6 +717,7 @@ impl Unparser<'_> {
fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
already_projected: bool,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(table_scan) => {
Expand Down Expand Up @@ -743,24 +747,29 @@ impl Unparser<'_> {
}
}

if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
// Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
// information included in the TableScan node.
if !already_projected {
if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
}
}

let filter_expr: Result<Option<Expr>> = table_scan
Expand Down Expand Up @@ -805,14 +814,17 @@ impl Unparser<'_> {
Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
already_projected,
)
}
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
// The inner table scan could be a scan with pushdown operations.
LogicalPlan::Projection(projection) => {
if let Some(plan) =
Self::unparse_table_scan_pushdown(&projection.input, alias.clone())?
{
if let Some(plan) = Self::unparse_table_scan_pushdown(
&projection.input,
alias.clone(),
already_projected,
)? {
let exprs = if alias.is_some() {
let mut alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
Expand Down
17 changes: 15 additions & 2 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ fn test_aggregation_without_projection() -> Result<()> {

assert_eq!(
actual,
r#"SELECT sum(users.age), users."name" FROM (SELECT users."name", users.age FROM users) GROUP BY users."name""#
r#"SELECT sum(users.age), users."name" FROM users GROUP BY users."name""#
);

Ok(())
Expand Down Expand Up @@ -926,12 +926,25 @@ fn test_table_scan_pushdown() -> Result<()> {
let query_from_table_scan_with_projection = LogicalPlanBuilder::from(
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?,
)
.project(vec![wildcard()])?
.project(vec![col("id"), col("age")])?
.build()?;
let query_from_table_scan_with_projection =
plan_to_sql(&query_from_table_scan_with_projection)?;
assert_eq!(
query_from_table_scan_with_projection.to_string(),
"SELECT t1.id, t1.age FROM t1"
);

let query_from_table_scan_with_two_projections = LogicalPlanBuilder::from(
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?,
)
.project(vec![col("id"), col("age")])?
.project(vec![wildcard()])?
.build()?;
let query_from_table_scan_with_two_projections =
plan_to_sql(&query_from_table_scan_with_two_projections)?;
assert_eq!(
query_from_table_scan_with_two_projections.to_string(),
"SELECT * FROM (SELECT t1.id, t1.age FROM t1)"
);

Expand Down

0 comments on commit a62033e

Please sign in to comment.