Skip to content

Commit

Permalink
fix bugs explain with non-correlated query
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Oct 31, 2024
1 parent 813220d commit 95e635f
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 75 deletions.
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,18 +880,18 @@ impl TableProvider for ListingTable {
None => {} // no ordering required
};

let filters = conjunction(filters.to_vec())
.map(|expr| -> Result<_> {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let filters = match conjunction(filters.to_vec()) {
Some(expr) => {
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
state.execution_props(),
)?;
Ok(Some(filters))
})
.unwrap_or(Ok(None))?;
Some(filters)
}
None => None,
};

let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
Expand Down
14 changes: 11 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1792,11 +1792,19 @@ impl DefaultPhysicalPlanner {
Err(e) => return Err(e),
}
}
Err(e) => stringified_plans
.push(StringifiedPlan::new(InitialPhysicalPlan, e.to_string())),
Err(err) => {
return Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(Schema::new(vec![arrow_schema::Field::new(
"Err",
arrow_schema::DataType::Utf8,
false,
)])),
vec![StringifiedPlan::new(FinalLogicalPlan, err.to_string())],
e.verbose,
))))
}
}
}

Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(e.schema.as_ref().to_owned().into()),
stringified_plans,
Expand Down
18 changes: 17 additions & 1 deletion datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl ExplainExec {
self.verbose
}

/// check if current plan is a failed explain plan
pub fn is_failed_explain(&self) -> bool {
self.stringified_plans.len() == 1 && self.schema.fields().len() == 1
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
Expand Down Expand Up @@ -132,7 +137,18 @@ impl ExecutionPlan for ExplainExec {
if 0 != partition {
return internal_err!("ExplainExec invalid partition {partition}");
}

if self.is_failed_explain() {
let mut err_builder = StringBuilder::with_capacity(1, 1024);
err_builder.append_value(&*self.stringified_plans[0].plan);
let record_batch = RecordBatch::try_new(
Arc::clone(&self.schema),
vec![Arc::new(err_builder.finish())],
)?;
return Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::iter(vec![Ok(record_batch)]),
)));
}
let mut type_builder =
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
let mut plan_builder =
Expand Down
18 changes: 18 additions & 0 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,21 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[{c0:1,c1:2.3,c2:abc} as struct(Int64(1),Float64(2.3),Utf8("abc"))]
02)--PlaceholderRowExec


statement ok
create table t1(a int);

statement ok
create table t2(b int);

query T
explain select a from t1 where exists (select count(*) from t2);
----
This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: <subquery>, negated: false })

statement ok
drop table t1;

statement ok
drop table t2;
8 changes: 2 additions & 6 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4070,18 +4070,14 @@ physical_plan
08)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true

# we do not generate physical plan for Repartition yet (e.g Distribute By queries).
query TT
query T
EXPLAIN SELECT a, b, sum1
FROM (SELECT c, b, a, SUM(d) as sum1
FROM multiple_ordered_table_with_pk
GROUP BY c)
DISTRIBUTE BY a
----
logical_plan
01)Repartition: DistributeBy(multiple_ordered_table_with_pk.a)
02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
This feature is not implemented: Physical plan does not support DistributeBy partitioning

# union with aggregate
query TT
Expand Down
26 changes: 4 additions & 22 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4049,19 +4049,10 @@ physical_plan


# Test CROSS JOIN LATERAL syntax (planning)
query TT
query T
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i);
----
logical_plan
01)Cross Join:
02)--SubqueryAlias: t1
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))
09)------------EmptyRelation
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" })


# Test CROSS JOIN LATERAL syntax (execution)
Expand All @@ -4071,19 +4062,10 @@ select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnes


# Test INNER JOIN LATERAL syntax (planning)
query TT
query T
explain select t1_id, t1_name, i from join_t1 t2 inner join lateral (select * from unnest(generate_series(1, t1_int))) as series(i) on(t1_id > i);
----
logical_plan
01)Inner Join: Filter: CAST(t2.t1_id AS Int64) > series.i
02)--SubqueryAlias: t2
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))
09)------------EmptyRelation
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" })


# Test INNER JOIN LATERAL syntax (execution)
Expand Down
47 changes: 10 additions & 37 deletions datafusion/sqllogictest/test_files/update.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,30 @@ create table t1(a int, b varchar, c double, d int);
statement ok
set datafusion.optimizer.max_passes = 0;

query TT
query T
explain update t1 set a=1, b=2, c=3.0, d=NULL;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8) AS b, Float64(3) AS c, CAST(NULL AS Int32) AS d
03)----TableScan: t1
This feature is not implemented: Unsupported logical plan: Dml(Update)

query TT
query T
explain update t1 set a=c+1, b=a, c=c+1.0, d=b;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a, CAST(t1.a AS Utf8) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d
03)----TableScan: t1
This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t2(a int, b varchar, c double, d int);

## set from subquery
query TT
query T
explain update t1 set b = (select max(b) from t2 where t1.a = t2.a)
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, (<subquery>) AS b, t1.c AS c, t1.d AS d
03)----Subquery:
04)------Projection: max(t2.b)
05)--------Aggregate: groupBy=[[]], aggr=[[max(t2.b)]]
06)----------Filter: outer_ref(t1.a) = t2.a
07)------------TableScan: t2
08)----TableScan: t1
This feature is not implemented: Physical plan does not support logical expression ScalarSubquery(<subquery>)

# set from other table
query TT
query T
explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------TableScan: t1
06)--------TableScan: t2
This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t3(a int, b varchar, c double, d int);
Expand All @@ -79,14 +59,7 @@ query error DataFusion error: SQL error: ParserError\("Expected end of statement
explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a;

# test table alias
query TT
query T
explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t.a = t2.a AND t.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------SubqueryAlias: t
06)----------TableScan: t1
07)--------TableScan: t2
This feature is not implemented: Unsupported logical plan: Dml(Update)

0 comments on commit 95e635f

Please sign in to comment.