Skip to content

Commit

Permalink
feat: impl View as data source
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 2, 2024
1 parent a828a57 commit a754969
Show file tree
Hide file tree
Showing 66 changed files with 659 additions and 408 deletions.
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly-2024-04-27
nightly-2024-10-10
2 changes: 1 addition & 1 deletion src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{

use super::{Binder, QueryBindStep};

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub fn bind_aggregate(
&mut self,
children: LogicalPlan,
Expand Down
4 changes: 2 additions & 2 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_alter_table(
&mut self,
name: &ObjectName,
Expand All @@ -21,7 +21,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
let table_name: Arc<String> = Arc::new(lower_case_name(name)?);
let table = self
.context
.table(table_name.clone())
.table(table_name.clone())?
.ok_or(DatabaseError::TableNotFound)?;
let plan = match operation {
AlterTableOperation::AddColumn {
Expand Down
20 changes: 14 additions & 6 deletions src/binder/analyze.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::binder::{lower_case_name, Binder};
use crate::binder::{lower_case_name, Binder, Source};
use crate::errors::DatabaseError;
use crate::planner::operator::analyze::AnalyzeOperator;
use crate::planner::operator::table_scan::TableScanOperator;
Expand All @@ -8,16 +8,24 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_analyze(&mut self, name: &ObjectName) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

let table_catalog = self
let table = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let index_metas = table_catalog.indexes.clone();
.source_and_bind(table_name.clone(), None, None, true)?
.and_then(|source| {
if let Source::Table(table) = source {
Some(table)
} else {
None
}
})
.ok_or(DatabaseError::TableNotFound)?;
let index_metas = table.indexes.clone();

let scan_op = TableScanOperator::build(table_name.clone(), table_catalog);
let scan_op = TableScanOperator::build(table_name.clone(), table);
Ok(LogicalPlan::new(
Operator::Analyze(AnalyzeOperator {
table_name,
Expand Down
4 changes: 2 additions & 2 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl FromStr for ExtSource {
}
}

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(super) fn bind_copy(
&mut self,
source: CopySource,
Expand All @@ -80,7 +80,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
}
};

if let Some(table) = self.context.table(Arc::new(table_name.to_string())) {
if let Some(table) = self.context.table(Arc::new(table_name.to_string()))? {
let schema_ref = table.schema_ref().clone();
let ext_source = ExtSource {
path: match target {
Expand Down
14 changes: 9 additions & 5 deletions src/binder/create_index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::binder::{lower_case_name, Binder};
use crate::binder::{lower_case_name, Binder, Source};
use crate::errors::DatabaseError;
use crate::expression::ScalarExpression;
use crate::planner::operator::create_index::CreateIndexOperator;
Expand All @@ -10,7 +10,7 @@ use crate::types::index::IndexType;
use sqlparser::ast::{ObjectName, OrderByExpr};
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_create_index(
&mut self,
table_name: &ObjectName,
Expand All @@ -29,10 +29,14 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
IndexType::Composite
};

let table = self
let source = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let plan = TableScanOperator::build(table_name.clone(), table);
.source_and_bind(table_name.clone(), None, None, false)?
.ok_or(DatabaseError::SourceNotFound)?;
let plan = match source {
Source::Table(table) => TableScanOperator::build(table_name.clone(), table),
Source::View(view) => LogicalPlan::clone(&view.plan),
};
let mut columns = Vec::with_capacity(exprs.len());

for expr in exprs {
Expand Down
4 changes: 3 additions & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::LogicalType;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
// TODO: TableConstraint
pub(crate) fn bind_create_table(
&mut self,
Expand Down Expand Up @@ -158,13 +158,15 @@ mod tests {
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let scala_functions = Default::default();
let table_functions = Default::default();

let sql = "create table t1 (id int primary key, name varchar(10) null)";
let mut binder = Binder::new(
BinderContext::new(
&table_cache,
&view_cache,
&transaction,
&scala_functions,
&table_functions,
Expand Down
2 changes: 1 addition & 1 deletion src/binder/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use sqlparser::ast::{Ident, ObjectName, Query};
use std::sync::Arc;
use ulid::Ulid;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_create_view(
&mut self,
or_replace: &bool,
Expand Down
21 changes: 13 additions & 8 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::binder::{lower_case_name, Binder};
use crate::binder::{lower_case_name, Binder, Source};
use crate::errors::DatabaseError;
use crate::planner::operator::delete::DeleteOperator;
use crate::planner::operator::table_scan::TableScanOperator;
Expand All @@ -8,7 +8,7 @@ use crate::storage::Transaction;
use sqlparser::ast::{Expr, TableAlias, TableFactor, TableWithJoins};
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_delete(
&mut self,
from: &TableWithJoins,
Expand All @@ -23,15 +23,20 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
table_alias = Some(Arc::new(name.value.to_lowercase()));
alias_idents = Some(columns);
}
let table_catalog =
self.context
.table_and_bind(table_name.clone(), table_alias.clone(), None)?;
let primary_key_column = table_catalog
.columns()
let source = self
.context
.source_and_bind(table_name.clone(), table_alias.as_ref(), None, false)?
.ok_or(DatabaseError::SourceNotFound)?;
let schema_buf = self.table_schema_buf.entry(table_name.clone()).or_default();
let primary_key_column = source
.columns(schema_buf)
.find(|column| column.desc().is_primary)
.cloned()
.unwrap();
let mut plan = TableScanOperator::build(table_name.clone(), table_catalog);
let mut plan = match source {
Source::Table(table) => TableScanOperator::build(table_name.clone(), table),
Source::View(view) => LogicalPlan::clone(&view.plan),
};

if let Some(alias_idents) = alias_idents {
plan =
Expand Down
2 changes: 1 addition & 1 deletion src/binder/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_describe(
&mut self,
name: &ObjectName,
Expand Down
2 changes: 1 addition & 1 deletion src/binder/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::planner::operator::aggregate::AggregateOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub fn bind_distinct(
&mut self,
children: LogicalPlan,
Expand Down
7 changes: 3 additions & 4 deletions src/binder/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_drop_table(
&mut self,
name: &ObjectName,
if_exists: &bool,
) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

let plan = LogicalPlan::new(
Ok(LogicalPlan::new(
Operator::DropTable(DropTableOperator {
table_name,
if_exists: *if_exists,
}),
vec![],
);
Ok(plan)
))
}
}
2 changes: 1 addition & 1 deletion src/binder/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_explain(&mut self, plan: LogicalPlan) -> Result<LogicalPlan, DatabaseError> {
Ok(LogicalPlan::new(Operator::Explain, vec![plan]))
}
Expand Down
82 changes: 46 additions & 36 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::catalog::{ColumnCatalog, ColumnRef};
use crate::catalog::{ColumnCatalog, ColumnRef, TableName};
use crate::errors::DatabaseError;
use crate::expression;
use crate::expression::agg::AggKind;
Expand All @@ -7,6 +7,7 @@ use sqlparser::ast::{
BinaryOperator, CharLengthUnits, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident,
Query, UnaryOperator,
};
use std::collections::HashMap;
use std::slice;
use std::sync::Arc;

Expand All @@ -15,7 +16,7 @@ use crate::expression::function::scala::{ArcScalarFunctionImpl, ScalarFunction};
use crate::expression::function::table::{ArcTableFunctionImpl, TableFunction};
use crate::expression::function::FunctionSummary;
use crate::expression::{AliasType, ScalarExpression};
use crate::planner::LogicalPlan;
use crate::planner::{LogicalPlan, SchemaOutput};
use crate::storage::Transaction;
use crate::types::value::{DataValue, Utf8Type};
use crate::types::{ColumnId, LogicalType};
Expand All @@ -39,7 +40,7 @@ macro_rules! try_default {
};
}

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<'a, T: Transaction> Binder<'a, '_, T> {
pub(crate) fn bind_expr(&mut self, expr: &Expr) -> Result<ScalarExpression, DatabaseError> {
match expr {
Expr::Identifier(ident) => {
Expand Down Expand Up @@ -249,6 +250,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
) -> Result<(LogicalPlan, ColumnRef), DatabaseError> {
let BinderContext {
table_cache,
view_cache,
transaction,
scala_functions,
table_functions,
Expand All @@ -258,6 +260,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
let mut binder = Binder::new(
BinderContext::new(
table_cache,
view_cache,
*transaction,
scala_functions,
table_functions,
Expand Down Expand Up @@ -324,46 +327,53 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
try_default!(&full_name.0, full_name.1);
}
if let Some(table) = full_name.0.or(bind_table_name) {
let table_catalog = self.context.bind_table(&table, self.parent)?;

let column_catalog = table_catalog
.get_column_by_name(&full_name.1)
.ok_or_else(|| DatabaseError::NotFound("column", full_name.1))?;
Ok(ScalarExpression::ColumnRef(column_catalog.clone()))
let source = self.context.bind_source(&table, self.parent)?;
let schema_buf = self.table_schema_buf.entry(Arc::new(table)).or_default();

Ok(ScalarExpression::ColumnRef(
source
.column(&full_name.1, schema_buf)
.ok_or_else(|| DatabaseError::NotFound("column", full_name.1.to_string()))?,
))
} else {
let op = |got_column: &mut Option<ScalarExpression>, context: &BinderContext<'a, T>| {
for ((_, alias, _), table_catalog) in context.bind_table.iter() {
if got_column.is_some() {
break;
}
if let Some(alias) = alias {
*got_column = self.context.expr_aliases.iter().find_map(
|((alias_table, alias_column), expr)| {
matches!(
alias_table
.as_ref()
.map(|table_name| table_name == alias.as_ref()
&& alias_column == &full_name.1),
Some(true)
)
.then(|| expr.clone())
},
);
} else if let Some(column_catalog) =
table_catalog.get_column_by_name(&full_name.1)
{
*got_column = Some(ScalarExpression::ColumnRef(column_catalog.clone()));
let op =
|got_column: &mut Option<ScalarExpression>,
context: &BinderContext<'a, T>,
table_schema_buf: &mut HashMap<TableName, Option<SchemaOutput>>| {
for ((table_name, alias, _), source) in context.bind_table.iter() {
if got_column.is_some() {
break;
}
if let Some(alias) = alias {
*got_column = self.context.expr_aliases.iter().find_map(
|((alias_table, alias_column), expr)| {
matches!(
alias_table
.as_ref()
.map(|table_name| table_name == alias.as_ref()
&& alias_column == &full_name.1),
Some(true)
)
.then(|| expr.clone())
},
);
} else if let Some(column) = {
let schema_buf =
table_schema_buf.entry(table_name.clone()).or_default();
source.column(&full_name.1, schema_buf)
} {
*got_column = Some(ScalarExpression::ColumnRef(column));
}
}
}
};
};
// handle col syntax
let mut got_column = None;

op(&mut got_column, &self.context);
op(&mut got_column, &self.context, &mut self.table_schema_buf);
if let Some(parent) = self.parent {
op(&mut got_column, &parent.context);
op(&mut got_column, &parent.context, &mut self.table_schema_buf);
}
Ok(got_column.ok_or_else(|| DatabaseError::NotFound("column", full_name.1))?)
Ok(got_column.ok_or(DatabaseError::NotFound("column", full_name.1))?)
}
}

Expand Down
Loading

0 comments on commit a754969

Please sign in to comment.