Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use explicit enum for physical errors #2

Merged
merged 2 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion datafusion/common/src/display/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum PlanType {
FinalPhysicalPlanWithStats,
/// The final with schema, fully optimized physical plan which would be executed
FinalPhysicalPlanWithSchema,
/// An error creating the physical plan
PhysicalPlanError,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By adding an explicit new "plan type" it became clear how to show the errors

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I want to do in the begining, but I didn't want to introduce many overhead so I changed it. Sometimes I am confused about balancing change size and simplicity. Thanks for your suggestion!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is totally a judgement call

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is totally a judgement call -- I don't think there is ever one right answer

}

impl Display for PlanType {
Expand Down Expand Up @@ -91,6 +93,7 @@ impl Display for PlanType {
PlanType::FinalPhysicalPlanWithSchema => {
write!(f, "physical_plan_with_schema")
}
PlanType::PhysicalPlanError => write!(f, "physical_plan_error"),
}
}
}
Expand Down Expand Up @@ -118,7 +121,7 @@ impl StringifiedPlan {
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
match self.plan_type {
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan | PlanType::PhysicalPlanError=> true,
_ => verbose_mode,
}
}
Expand Down
14 changes: 5 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1798,18 +1798,14 @@ impl DefaultPhysicalPlanner {
}
}
Err(err) => {
return Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(Schema::new(vec![arrow_schema::Field::new(
"Err",
arrow_schema::DataType::Utf8,
false,
)])),
vec![StringifiedPlan::new(FinalLogicalPlan, err.to_string())],
e.verbose,
))))
// use FinalLogicalPlan so the error appears in the final output by default
// Initial plans are only shown in verbose mode
stringified_plans
.push(StringifiedPlan::new(PhysicalPlanError, err.to_string()));
}
}
}

Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(e.schema.as_ref().to_owned().into()),
stringified_plans,
Expand Down
17 changes: 0 additions & 17 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ impl ExplainExec {
self.verbose
}

/// check if current plan is a failed explain plan
pub fn is_failed_explain(&self) -> bool {
self.stringified_plans.len() == 1 && self.schema.fields().len() == 1
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
Expand Down Expand Up @@ -137,18 +132,6 @@ impl ExecutionPlan for ExplainExec {
if 0 != partition {
return internal_err!("ExplainExec invalid partition {partition}");
}
if self.is_failed_explain() {
let mut err_builder = StringBuilder::with_capacity(1, 1024);
err_builder.append_value(&*self.stringified_plans[0].plan);
let record_batch = RecordBatch::try_new(
Arc::clone(&self.schema),
vec![Arc::new(err_builder.finish())],
)?;
return Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::iter(vec![Ok(record_batch)]),
)));
}
let mut type_builder =
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
let mut plan_builder =
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ message PlanType {
datafusion_common.EmptyMessage FinalPhysicalPlan = 6;
datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10;
datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12;
datafusion_common.EmptyMessage PhysicalPlanError = 13;
}
}

Expand Down
13 changes: 13 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::protobuf::{
AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
OptimizedPhysicalPlan,
OptimizedPhysicalPlan, PhysicalPlanError
},
AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
Expand Down Expand Up @@ -141,6 +141,7 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
PhysicalPlanError(_) => PlanType::PhysicalPlanError,
},
plan: Arc::new(stringified_plan.plan.clone()),
}
Expand Down
5 changes: 4 additions & 1 deletion datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::protobuf::{
FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
PhysicalPlanError,
},
AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
Expand Down Expand Up @@ -115,7 +116,9 @@ impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
}),
},
PlanType::PhysicalPlanError => Some(protobuf::PlanType {
plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
}), },
plan: stringified_plan.plan.to_string(),
}
}
Expand Down
11 changes: 9 additions & 2 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,17 @@ create table t1(a int);
statement ok
create table t2(b int);

query T
query TT
explain select a from t1 where exists (select count(*) from t2);
----
This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: <subquery>, negated: false })
logical_plan
01)Filter: EXISTS (<subquery>)
02)--Subquery:
03)----Projection: count(*)
04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
05)--------TableScan: t2
06)--TableScan: t1 projection=[a]
physical_plan_error This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: <subquery>, negated: false })

statement ok
drop table t1;
Expand Down
9 changes: 7 additions & 2 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4070,14 +4070,19 @@ physical_plan
08)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true

# we do not generate physical plan for Repartition yet (e.g Distribute By queries).
query T
query TT
EXPLAIN SELECT a, b, sum1
FROM (SELECT c, b, a, SUM(d) as sum1
FROM multiple_ordered_table_with_pk
GROUP BY c)
DISTRIBUTE BY a
----
This feature is not implemented: Physical plan does not support DistributeBy partitioning
logical_plan
01)Repartition: DistributeBy(multiple_ordered_table_with_pk.a)
02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
physical_plan_error This feature is not implemented: Physical plan does not support DistributeBy partitioning

# union with aggregate
query TT
Expand Down
28 changes: 24 additions & 4 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4049,10 +4049,20 @@ physical_plan


# Test CROSS JOIN LATERAL syntax (planning)
query T
query TT
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i);
----
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" })
logical_plan
01)Cross Join:
02)--SubqueryAlias: t1
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))
09)------------EmptyRelation
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" })


# Test CROSS JOIN LATERAL syntax (execution)
Expand All @@ -4062,10 +4072,20 @@ select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnes


# Test INNER JOIN LATERAL syntax (planning)
query T
query TT
explain select t1_id, t1_name, i from join_t1 t2 inner join lateral (select * from unnest(generate_series(1, t1_int))) as series(i) on(t1_id > i);
----
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" })
logical_plan
01)Inner Join: Filter: CAST(t2.t1_id AS Int64) > series.i
02)--SubqueryAlias: t2
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))
09)------------EmptyRelation
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" })


# Test INNER JOIN LATERAL syntax (execution)
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/prepare.slt
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ query TT
EXPLAIN EXECUTE my_plan;
----
logical_plan Execute: my_plan params=[]
physical_plan_error This feature is not implemented: Unsupported logical plan: Execute

query TT
EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo');
----
logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")]
physical_plan_error This feature is not implemented: Unsupported logical plan: Execute

query error DataFusion error: Schema error: No field named a\.
EXPLAIN EXECUTE my_plan(a);
Expand Down
52 changes: 42 additions & 10 deletions datafusion/sqllogictest/test_files/update.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,54 @@ create table t1(a int, b varchar, c double, d int);
statement ok
set datafusion.optimizer.max_passes = 0;

query T
query TT
explain update t1 set a=1, b=2, c=3.0, d=NULL;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8) AS b, Float64(3) AS c, CAST(NULL AS Int32) AS d
03)----TableScan: t1
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

query T
query TT
explain update t1 set a=c+1, b=a, c=c+1.0, d=b;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a, CAST(t1.a AS Utf8) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d
03)----TableScan: t1
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t2(a int, b varchar, c double, d int);

## set from subquery
query T
query TT
explain update t1 set b = (select max(b) from t2 where t1.a = t2.a)
----
This feature is not implemented: Physical plan does not support logical expression ScalarSubquery(<subquery>)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, (<subquery>) AS b, t1.c AS c, t1.d AS d
03)----Subquery:
04)------Projection: max(t2.b)
05)--------Aggregate: groupBy=[[]], aggr=[[max(t2.b)]]
06)----------Filter: outer_ref(t1.a) = t2.a
07)------------TableScan: t2
08)----TableScan: t1
physical_plan_error This feature is not implemented: Physical plan does not support logical expression ScalarSubquery(<subquery>)

# set from other table
query T
query TT
explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------TableScan: t1
06)--------TableScan: t2
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this way we still see the logical explain, but now it is also clear there is a physical plan error


statement ok
create table t3(a int, b varchar, c double, d int);
Expand All @@ -59,7 +83,15 @@ query error DataFusion error: SQL error: ParserError\("Expected end of statement
explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a;

# test table alias
query T
query TT
explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t.a = t2.a AND t.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------SubqueryAlias: t
06)----------TableScan: t1
07)--------TableScan: t2
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)