Skip to content

Commit

Permalink
Add LogicalPlan::CreateIndex (apache#11817)
Browse files Browse the repository at this point in the history
* Add create index plan

* Fix clippy lints
  • Loading branch information
lewiszlw authored Aug 7, 2024
1 parent bddb641 commit 117ab1b
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 27 deletions.
29 changes: 17 additions & 12 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,30 +544,35 @@ impl SessionContext {
// stack overflows.
match ddl {
DdlStatement::CreateExternalTable(cmd) => {
Box::pin(async move { self.create_external_table(&cmd).await })
as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>
(Box::pin(async move { self.create_external_table(&cmd).await })
as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>)
.await
}
DdlStatement::CreateMemoryTable(cmd) => {
Box::pin(self.create_memory_table(cmd))
Box::pin(self.create_memory_table(cmd)).await
}
DdlStatement::CreateView(cmd) => {
Box::pin(self.create_view(cmd)).await
}
DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)),
DdlStatement::CreateCatalogSchema(cmd) => {
Box::pin(self.create_catalog_schema(cmd))
Box::pin(self.create_catalog_schema(cmd)).await
}
DdlStatement::CreateCatalog(cmd) => {
Box::pin(self.create_catalog(cmd))
Box::pin(self.create_catalog(cmd)).await
}
DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)),
DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)),
DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)).await,
DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)).await,
DdlStatement::DropCatalogSchema(cmd) => {
Box::pin(self.drop_schema(cmd))
Box::pin(self.drop_schema(cmd)).await
}
DdlStatement::CreateFunction(cmd) => {
Box::pin(self.create_function(cmd))
Box::pin(self.create_function(cmd)).await
}
DdlStatement::DropFunction(cmd) => {
Box::pin(self.drop_function(cmd)).await
}
DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)),
ddl => Ok(DataFrame::new(self.state(), LogicalPlan::Ddl(ddl))),
}
.await
}
// TODO what about the other statements (like TransactionStart and TransactionEnd)
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
Expand Down
19 changes: 19 additions & 0 deletions datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub enum DdlStatement {
CreateCatalogSchema(CreateCatalogSchema),
/// Creates a new catalog (aka "Database").
CreateCatalog(CreateCatalog),
/// Creates a new index.
CreateIndex(CreateIndex),
/// Drops a table.
DropTable(DropTable),
/// Drops a view.
Expand All @@ -66,6 +68,7 @@ impl DdlStatement {
schema
}
DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
DdlStatement::CreateIndex(CreateIndex { schema, .. }) => schema,
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
DdlStatement::DropView(DropView { schema, .. }) => schema,
DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
Expand All @@ -83,6 +86,7 @@ impl DdlStatement {
DdlStatement::CreateView(_) => "CreateView",
DdlStatement::CreateCatalogSchema(_) => "CreateCatalogSchema",
DdlStatement::CreateCatalog(_) => "CreateCatalog",
DdlStatement::CreateIndex(_) => "CreateIndex",
DdlStatement::DropTable(_) => "DropTable",
DdlStatement::DropView(_) => "DropView",
DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
Expand All @@ -101,6 +105,7 @@ impl DdlStatement {
vec![input]
}
DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
DdlStatement::CreateIndex(_) => vec![],
DdlStatement::DropTable(_) => vec![],
DdlStatement::DropView(_) => vec![],
DdlStatement::DropCatalogSchema(_) => vec![],
Expand Down Expand Up @@ -147,6 +152,9 @@ impl DdlStatement {
}) => {
write!(f, "CreateCatalog: {catalog_name:?}")
}
DdlStatement::CreateIndex(CreateIndex { name, .. }) => {
write!(f, "CreateIndex: {name:?}")
}
DdlStatement::DropTable(DropTable {
name, if_exists, ..
}) => {
Expand Down Expand Up @@ -351,3 +359,14 @@ pub struct DropFunction {
pub if_exists: bool,
pub schema: DFSchemaRef,
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct CreateIndex {
pub name: Option<String>,
pub table: TableReference,
pub using: Option<String>,
pub columns: Vec<Expr>,
pub unique: bool,
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub use builder::{
};
pub use ddl::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DropCatalogSchema,
DropFunction, DropTable, DropView, OperateFunctionArg,
CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement,
DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg,
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl TreeNode for LogicalPlan {
DdlStatement::CreateExternalTable(_)
| DdlStatement::CreateCatalogSchema(_)
| DdlStatement::CreateCatalog(_)
| DdlStatement::CreateIndex(_)
| DdlStatement::DropTable(_)
| DdlStatement::DropView(_)
| DdlStatement::DropCatalogSchema(_)
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for CreateMemoryTable",
)),
LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for CreateIndex",
)),
LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropTable",
)),
Expand Down
58 changes: 47 additions & 11 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{
cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
CreateMemoryTable, CreateView, DescribeTable, DmlStatement, DropCatalogSchema,
DropFunction, DropTable, DropView, EmptyRelation, Explain, Expr, ExprSchemable,
Filter, LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
Volatility, WriteOp,
CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable,
DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation,
Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
OperateFunctionArg, PlanType, Prepare, SetVariable, Statement as PlanStatement,
ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd,
TransactionIsolationLevel, TransactionStart, Volatility, WriteOp,
};
use sqlparser::ast;
use sqlparser::ast::{
Assignment, AssignmentTarget, ColumnDef, CreateTable, CreateTableOptions, Delete,
DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert, ObjectName, ObjectType,
OneOrManyWithParens, Query, SchemaName, SetExpr, ShowCreateObject,
ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
TransactionMode, UnaryOperator, Value,
Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
TableWithJoins, TransactionMode, UnaryOperator, Value,
};
use sqlparser::parser::ParserError::ParserError;

Expand Down Expand Up @@ -769,6 +769,42 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
exec_err!("Function name not provided")
}
}
Statement::CreateIndex(CreateIndex {
name,
table_name,
using,
columns,
unique,
if_not_exists,
..
}) => {
let name: Option<String> = name.as_ref().map(object_name_to_string);
let table = self.object_name_to_table_reference(table_name)?;
let table_schema = self
.context_provider
.get_table_source(table.clone())?
.schema()
.to_dfschema_ref()?;
let using: Option<String> = using.as_ref().map(ident_to_string);
let columns = self.order_by_to_sort_expr(
columns,
&table_schema,
planner_context,
false,
None,
)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
PlanCreateIndex {
name,
table,
using,
columns,
unique,
if_not_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
},
)))
}
_ => {
not_impl_err!("Unsupported SQL statement: {sql:?}")
}
Expand Down
34 changes: 32 additions & 2 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ use datafusion_common::{
assert_contains, DataFusionError, ParamValues, Result, ScalarValue,
};
use datafusion_expr::{
col,
dml::CopyTo,
logical_plan::{LogicalPlan, Prepare},
test::function_stub::sum_udaf,
ColumnarValue, CreateExternalTable, DdlStatement, ScalarUDF, ScalarUDFImpl,
Signature, Volatility,
ColumnarValue, CreateExternalTable, CreateIndex, DdlStatement, ScalarUDF,
ScalarUDFImpl, Signature, Volatility,
};
use datafusion_functions::{string, unicode};
use datafusion_sql::{
Expand Down Expand Up @@ -4426,6 +4427,35 @@ fn test_parse_escaped_string_literal_value() {
)
}

#[test]
fn plan_create_index() {
let sql =
"CREATE UNIQUE INDEX IF NOT EXISTS idx_name ON test USING btree (name, age DESC)";
let plan = logical_plan_with_options(sql, ParserOptions::default()).unwrap();
match plan {
LogicalPlan::Ddl(DdlStatement::CreateIndex(CreateIndex {
name,
table,
using,
columns,
unique,
if_not_exists,
..
})) => {
assert_eq!(name, Some("idx_name".to_string()));
assert_eq!(format!("{table}"), "test");
assert_eq!(using, Some("btree".to_string()));
assert_eq!(
columns,
vec![col("name").sort(true, false), col("age").sort(false, true),]
);
assert!(unique);
assert!(if_not_exists);
}
_ => panic!("wrong plan type"),
}
}

fn assert_field_not_found(err: DataFusionError, name: &str) {
match err {
DataFusionError::SchemaError { .. } => {
Expand Down

0 comments on commit 117ab1b

Please sign in to comment.