Skip to content

Commit

Permalink
Feat/View (#236)
Browse files Browse the repository at this point in the history
* feat: impl `View` Encode & Decode

* chore: add `ReferenceSerialization` for `DataValue::Tuple` & fix subquery plan on `ReferenceSerialization`

* feat: impl `CreateView`

* feat: impl `View` as data source

* test: add test for `View`

* chore: add feature `is_sorted`

* chore: version up
  • Loading branch information
KKould authored Nov 3, 2024
1 parent ae59a91 commit 7498b21
Show file tree
Hide file tree
Showing 97 changed files with 1,688 additions and 702 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.3"
version = "0.0.4"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "SQL as a Function for Rust"
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,11 @@ let fnck_sql = DataBaseBuilder::path("./data")
- Create
- [x] Table
- [x] Index: Unique\Normal\Composite
- [x] View
- Drop
- [x] Table
- [ ] Index
- [ ] View
- Alert
- [x] Add Column
- [x] Drop Column
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly-2024-10-18
nightly-2024-10-10
2 changes: 1 addition & 1 deletion src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{

use super::{Binder, QueryBindStep};

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub fn bind_aggregate(
&mut self,
children: LogicalPlan,
Expand Down
4 changes: 2 additions & 2 deletions src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_alter_table(
&mut self,
name: &ObjectName,
Expand All @@ -21,7 +21,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
let table_name: Arc<String> = Arc::new(lower_case_name(name)?);
let table = self
.context
.table(table_name.clone())
.table(table_name.clone())?
.ok_or(DatabaseError::TableNotFound)?;
let plan = match operation {
AlterTableOperation::AddColumn {
Expand Down
20 changes: 14 additions & 6 deletions src/binder/analyze.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::binder::{lower_case_name, Binder};
use crate::binder::{lower_case_name, Binder, Source};
use crate::errors::DatabaseError;
use crate::planner::operator::analyze::AnalyzeOperator;
use crate::planner::operator::table_scan::TableScanOperator;
Expand All @@ -8,16 +8,24 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_analyze(&mut self, name: &ObjectName) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

let table_catalog = self
let table = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let index_metas = table_catalog.indexes.clone();
.source_and_bind(table_name.clone(), None, None, true)?
.and_then(|source| {
if let Source::Table(table) = source {
Some(table)
} else {
None
}
})
.ok_or(DatabaseError::TableNotFound)?;
let index_metas = table.indexes.clone();

let scan_op = TableScanOperator::build(table_name.clone(), table_catalog);
let scan_op = TableScanOperator::build(table_name.clone(), table);
Ok(LogicalPlan::new(
Operator::Analyze(AnalyzeOperator {
table_name,
Expand Down
17 changes: 3 additions & 14 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_macros::ReferenceSerialization;
use sqlparser::ast::{CopyOption, CopySource, CopyTarget};

#[derive(
Debug,
PartialEq,
PartialOrd,
Ord,
Hash,
Eq,
Clone,
Serialize,
Deserialize,
ReferenceSerialization,
)]
#[derive(Debug, PartialEq, PartialOrd, Ord, Hash, Eq, Clone, ReferenceSerialization)]
pub struct ExtSource {
pub path: PathBuf,
pub format: FileFormat,
Expand Down Expand Up @@ -73,7 +62,7 @@ impl FromStr for ExtSource {
}
}

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(super) fn bind_copy(
&mut self,
source: CopySource,
Expand All @@ -91,7 +80,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
}
};

if let Some(table) = self.context.table(Arc::new(table_name.to_string())) {
if let Some(table) = self.context.table(Arc::new(table_name.to_string()))? {
let schema_ref = table.schema_ref().clone();
let ext_source = ExtSource {
path: match target {
Expand Down
14 changes: 9 additions & 5 deletions src/binder/create_index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::binder::{lower_case_name, Binder};
use crate::binder::{lower_case_name, Binder, Source};
use crate::errors::DatabaseError;
use crate::expression::ScalarExpression;
use crate::planner::operator::create_index::CreateIndexOperator;
Expand All @@ -10,7 +10,7 @@ use crate::types::index::IndexType;
use sqlparser::ast::{ObjectName, OrderByExpr};
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_create_index(
&mut self,
table_name: &ObjectName,
Expand All @@ -29,10 +29,14 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
IndexType::Composite
};

let table = self
let source = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let plan = TableScanOperator::build(table_name.clone(), table);
.source_and_bind(table_name.clone(), None, None, false)?
.ok_or(DatabaseError::SourceNotFound)?;
let plan = match source {
Source::Table(table) => TableScanOperator::build(table_name.clone(), table),
Source::View(view) => LogicalPlan::clone(&view.plan),
};
let mut columns = Vec::with_capacity(exprs.len());

for expr in exprs {
Expand Down
22 changes: 12 additions & 10 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::LogicalType;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
// TODO: TableConstraint
pub(crate) fn bind_create_table(
&mut self,
Expand Down Expand Up @@ -62,9 +62,9 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
.find(|column| column.name() == column_name)
{
if *is_primary {
column.desc.is_primary = true;
column.desc_mut().is_primary = true;
} else {
column.desc.is_unique = true;
column.desc_mut().is_unique = true;
}
}
}
Expand All @@ -73,7 +73,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
}
}

if columns.iter().filter(|col| col.desc.is_primary).count() != 1 {
if columns.iter().filter(|col| col.desc().is_primary).count() != 1 {
return Err(DatabaseError::InvalidTable(
"The primary key field must exist and have at least one".to_string(),
));
Expand Down Expand Up @@ -158,13 +158,15 @@ mod tests {
let storage = RocksStorage::new(temp_dir.path())?;
let transaction = storage.transaction()?;
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let view_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let scala_functions = Default::default();
let table_functions = Default::default();

let sql = "create table t1 (id int primary key, name varchar(10) null)";
let mut binder = Binder::new(
BinderContext::new(
&table_cache,
&view_cache,
&transaction,
&scala_functions,
&table_functions,
Expand All @@ -179,16 +181,16 @@ mod tests {
Operator::CreateTable(op) => {
debug_assert_eq!(op.table_name, Arc::new("t1".to_string()));
debug_assert_eq!(op.columns[0].name(), "id");
debug_assert_eq!(op.columns[0].nullable, false);
debug_assert_eq!(op.columns[0].nullable(), false);
debug_assert_eq!(
op.columns[0].desc,
ColumnDesc::new(LogicalType::Integer, true, false, None)?
op.columns[0].desc(),
&ColumnDesc::new(LogicalType::Integer, true, false, None)?
);
debug_assert_eq!(op.columns[1].name(), "name");
debug_assert_eq!(op.columns[1].nullable, true);
debug_assert_eq!(op.columns[1].nullable(), true);
debug_assert_eq!(
op.columns[1].desc,
ColumnDesc::new(
op.columns[1].desc(),
&ColumnDesc::new(
LogicalType::Varchar(Some(10), CharLengthUnits::Characters),
false,
false,
Expand Down
62 changes: 62 additions & 0 deletions src/binder/create_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::binder::{lower_case_name, lower_ident, Binder};
use crate::catalog::view::View;
use crate::catalog::{ColumnCatalog, ColumnRef};
use crate::errors::DatabaseError;
use crate::expression::{AliasType, ScalarExpression};
use crate::planner::operator::create_view::CreateViewOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use itertools::Itertools;
use sqlparser::ast::{Ident, ObjectName, Query};
use std::sync::Arc;
use ulid::Ulid;

impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_create_view(
&mut self,
or_replace: &bool,
name: &ObjectName,
columns: &[Ident],
query: &Query,
) -> Result<LogicalPlan, DatabaseError> {
let view_name = Arc::new(lower_case_name(name)?);
let mut plan = self.bind_query(query)?;

if !columns.is_empty() {
let mapping_schema = plan.output_schema();
let exprs = columns
.iter()
.enumerate()
.map(|(i, ident)| {
let mapping_column = &mapping_schema[i];
let mut column = ColumnCatalog::new(
lower_ident(ident),
mapping_column.nullable(),
mapping_column.desc().clone(),
);
column.set_ref_table(view_name.clone(), Ulid::new(), true);

ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(mapping_column.clone())),
alias: AliasType::Expr(Box::new(ScalarExpression::ColumnRef(
ColumnRef::from(column),
))),
}
})
.collect_vec();
plan = self.bind_project(plan, exprs)?;
}

Ok(LogicalPlan::new(
Operator::CreateView(CreateViewOperator {
view: View {
name: view_name,
plan: Box::new(plan),
},
or_replace: *or_replace,
}),
vec![],
))
}
}
23 changes: 14 additions & 9 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::binder::{lower_case_name, Binder};
use crate::binder::{lower_case_name, Binder, Source};
use crate::errors::DatabaseError;
use crate::planner::operator::delete::DeleteOperator;
use crate::planner::operator::table_scan::TableScanOperator;
Expand All @@ -8,7 +8,7 @@ use crate::storage::Transaction;
use sqlparser::ast::{Expr, TableAlias, TableFactor, TableWithJoins};
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_delete(
&mut self,
from: &TableWithJoins,
Expand All @@ -23,15 +23,20 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
table_alias = Some(Arc::new(name.value.to_lowercase()));
alias_idents = Some(columns);
}
let table_catalog =
self.context
.table_and_bind(table_name.clone(), table_alias.clone(), None)?;
let primary_key_column = table_catalog
.columns()
.find(|column| column.desc.is_primary)
let source = self
.context
.source_and_bind(table_name.clone(), table_alias.as_ref(), None, false)?
.ok_or(DatabaseError::SourceNotFound)?;
let schema_buf = self.table_schema_buf.entry(table_name.clone()).or_default();
let primary_key_column = source
.columns(schema_buf)
.find(|column| column.desc().is_primary)
.cloned()
.unwrap();
let mut plan = TableScanOperator::build(table_name.clone(), table_catalog);
let mut plan = match source {
Source::Table(table) => TableScanOperator::build(table_name.clone(), table),
Source::View(view) => LogicalPlan::clone(&view.plan),
};

if let Some(alias_idents) = alias_idents {
plan =
Expand Down
2 changes: 1 addition & 1 deletion src/binder/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_describe(
&mut self,
name: &ObjectName,
Expand Down
2 changes: 1 addition & 1 deletion src/binder/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::planner::operator::aggregate::AggregateOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub fn bind_distinct(
&mut self,
children: LogicalPlan,
Expand Down
7 changes: 3 additions & 4 deletions src/binder/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_drop_table(
&mut self,
name: &ObjectName,
if_exists: &bool,
) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

let plan = LogicalPlan::new(
Ok(LogicalPlan::new(
Operator::DropTable(DropTableOperator {
table_name,
if_exists: *if_exists,
}),
vec![],
);
Ok(plan)
))
}
}
2 changes: 1 addition & 1 deletion src/binder/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
impl<T: Transaction> Binder<'_, '_, T> {
pub(crate) fn bind_explain(&mut self, plan: LogicalPlan) -> Result<LogicalPlan, DatabaseError> {
Ok(LogicalPlan::new(Operator::Explain, vec![plan]))
}
Expand Down
Loading

0 comments on commit 7498b21

Please sign in to comment.