Skip to content

Commit

Permalink
impl dml(update) (#50)
Browse files Browse the repository at this point in the history
* feat: impl dml(update)

* docs: add `Bloom` Banner
  • Loading branch information
KKould authored Aug 23, 2023
1 parent 512690f commit 87f254c
Show file tree
Hide file tree
Showing 18 changed files with 224 additions and 50 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ select * from t1 order by a asc nulls first
- [x] Limit
- DML
- [x] Insert
- [ ] Update
- [x] Update
- [ ] Delete
- DataTypes
- Invalid
Expand Down
18 changes: 8 additions & 10 deletions src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use crate::planner::operator::values::ValuesOperator;
use crate::types::value::ValueRef;

impl Binder {

// TODO: 支持Project
pub(crate) fn bind_insert(
&mut self,
name: ObjectName,
Expand All @@ -24,21 +22,21 @@ impl Binder {

if let Some(table) = self.context.catalog.get_table_by_name(table_name) {
let table_id = table.id;
let mut col_catalogs = Vec::new();
let mut columns = Vec::new();

if idents.is_empty() {
col_catalogs = table.all_columns()
columns = table.all_columns()
.into_iter()
.map(|(_, catalog)| catalog.clone())
.collect_vec();
} else {
let bind_table_name = Some(table.name.to_string());
let bind_table_name = Some(table_name.to_string());
for ident in idents {
match self.bind_column_ref_from_identifiers(
slice::from_ref(ident),
bind_table_name.as_ref()
)? {
ScalarExpression::ColumnRef(catalog) => col_catalogs.push(catalog),
ScalarExpression::ColumnRef(catalog) => columns.push(catalog),
_ => unreachable!()
}
}
Expand All @@ -56,7 +54,7 @@ impl Binder {
})
.try_collect()?;

let values_plan = self.bind_values(rows, col_catalogs.clone());
let values_plan = self.bind_values(rows, columns);

Ok(LogicalPlan {
operator: Operator::Insert(
Expand All @@ -71,15 +69,15 @@ impl Binder {
}
}

fn bind_values(
pub(crate) fn bind_values(
&mut self,
rows: Vec<Vec<ValueRef>>,
col_catalogs: Vec<ColumnRef>
columns: Vec<ColumnRef>
) -> LogicalPlan {
LogicalPlan {
operator: Operator::Values(ValuesOperator {
rows,
columns: col_catalogs,
columns,
}),
childrens: vec![],
}
Expand Down
8 changes: 8 additions & 0 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod create_table;
pub mod expr;
mod select;
mod insert;
mod update;

use std::collections::BTreeMap;
use sqlparser::ast::{Ident, ObjectName, SetExpr, Statement};
Expand Down Expand Up @@ -71,6 +72,13 @@ impl Binder {
todo!()
}
}
Statement::Update { table, selection, assignments, .. } => {
if !table.joins.is_empty() {
unimplemented!()
} else {
self.bind_update(table, selection, assignments)?
}
}
_ => unimplemented!(),
};
Ok(plan)
Expand Down
4 changes: 2 additions & 2 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Binder {
Ok(plan)
}

fn bind_table_ref(&mut self, from: &[TableWithJoins]) -> Result<LogicalPlan, BindError> {
pub(crate) fn bind_table_ref(&mut self, from: &[TableWithJoins]) -> Result<LogicalPlan, BindError> {
assert!(from.len() < 2, "not support yet.");
if from.is_empty() {
return Ok(LogicalPlan {
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Binder {
Ok(LJoinOperator::new(left, right, on, join_type))
}

fn bind_where(
pub(crate) fn bind_where(
&mut self,
children: LogicalPlan,
predicate: &Expr,
Expand Down
72 changes: 72 additions & 0 deletions src/binder/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::slice;
use sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins};
use crate::binder::{Binder, BindError, lower_case_name, split_name};
use crate::expression::ScalarExpression;
use crate::planner::LogicalPlan;
use crate::planner::operator::Operator;
use crate::planner::operator::update::UpdateOperator;
use crate::types::value::ValueRef;

impl Binder {
pub(crate) fn bind_update(
&mut self,
to: &TableWithJoins,
selection: &Option<Expr>,
assignments: &[Assignment]
) -> Result<LogicalPlan, BindError> {
if let TableFactor::Table { name, .. } = &to.relation {
let name = lower_case_name(&name);
let (_, table_name) = split_name(&name)?;

let mut plan = self.bind_table_ref(slice::from_ref(to))?;

if let Some(predicate) = selection {
plan = self.bind_where(plan, predicate)?;
}

if let Some(table) = self.context.catalog.get_table_by_name(table_name) {
let table_id = table.id;
let bind_table_name = Some(table_name.to_string());

let mut columns = Vec::with_capacity(assignments.len());
let mut row = Vec::with_capacity(assignments.len());


for assignment in assignments {
let value = match self.bind_expr(&assignment.value)? {
ScalarExpression::Constant(value) => Ok::<ValueRef, BindError>(value),
_ => unreachable!(),
}?;

for ident in &assignment.id {
match self.bind_column_ref_from_identifiers(
slice::from_ref(&ident),
bind_table_name.as_ref()
)? {
ScalarExpression::ColumnRef(catalog) => {
columns.push(catalog);
row.push(value.clone());
},
_ => unreachable!()
}
}
}

let values_plan = self.bind_values(vec![row], columns);

Ok(LogicalPlan {
operator: Operator::Update(
UpdateOperator {
table_id,
}
),
childrens: vec![plan, values_plan],
})
} else {
Err(BindError::InvalidTable(format!("not found table {}", table_name)))
}
} else {
unreachable!("only table")
}
}
}
7 changes: 7 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ mod test {
let tuples_full_join = kipsql.run("select * from t1 full join t2 on a = c").await?;
println!("{}", create_table(&tuples_full_join));

println!("update t1 and filter:");
let _ = kipsql.run("update t1 set a = 0 where b > 1").await?;
println!("after t1:");
let update_after_full_t1 = kipsql.run("select * from t1").await?;
println!("{}", create_table(&update_after_full_t1));


Ok(())
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/execution/executor/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::catalog::CatalogError;
use crate::execution::executor::BoxedExecutor;
use crate::execution::ExecutorError;
use crate::storage::{Storage, Table};
use crate::types::{ColumnId, TableId};
use crate::types::{ColumnId, IdGenerator, TableId};
use crate::types::tuple::Tuple;
use crate::types::value::{DataValue, ValueRef};

Expand All @@ -29,7 +29,7 @@ impl Insert {
let all_columns = table_catalog.all_columns();

let mut tuple = Tuple {
id: None,
id: Some(IdGenerator::build() as usize),
columns: Vec::with_capacity(all_columns.len()),
values: Vec::with_capacity(all_columns.len()),
};
Expand Down
1 change: 1 addition & 0 deletions src/execution/executor/dml/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub(crate) mod insert;
pub(crate) mod update;
45 changes: 45 additions & 0 deletions src/execution/executor/dml/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::collections::HashMap;
use futures_async_stream::try_stream;
use crate::catalog::CatalogError;
use crate::execution::executor::BoxedExecutor;
use crate::execution::ExecutorError;
use crate::storage::{Storage, Table};
use crate::types::TableId;
use crate::types::tuple::Tuple;

pub struct Update { }

impl Update {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn execute(table_id: TableId, input: BoxedExecutor, values: BoxedExecutor, storage: impl Storage) {
if let Some(table_catalog) = storage.get_catalog().get_table(&table_id) {
let mut value_map = HashMap::new();

// only once
#[for_await]
for tuple in values {
let Tuple { columns, values, .. } = tuple?;
for i in 0..columns.len() {
value_map.insert(columns[i].id, values[i].clone());
}
}

let table = storage.get_table(&table_catalog.id)?;

#[for_await]
for tuple in input {
let mut tuple = tuple?;

for (i, column) in tuple.columns.iter().enumerate() {
if let Some(value) = value_map.get(&column.id) {
tuple.values[i] = value.clone();
}
}

table.append(tuple)?;
}
} else {
Err(CatalogError::NotFound("root", table_id.to_string()))?;
}
}
}
8 changes: 8 additions & 0 deletions src/execution/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use crate::execution::physical_plan::PhysicalPlan;
use crate::execution::executor::ddl::create::CreateTable;
use crate::execution::executor::dql::filter::Filter;
use crate::execution::executor::dml::insert::Insert;
use crate::execution::executor::dml::update::Update;
use crate::execution::executor::dql::join::hash_join::HashJoin;
use crate::execution::executor::dql::limit::Limit;
use crate::execution::executor::dql::projection::Projection;
use crate::execution::executor::dql::seq_scan::SeqScan;
use crate::execution::executor::dql::sort::Sort;
use crate::execution::executor::dql::values::Values;
use crate::execution::ExecutorError;
use crate::execution::physical_plan::physical_update::PhysicalUpdate;
use crate::planner::operator::join::JoinOperator;
use crate::storage::memory::MemStorage;
use crate::types::tuple::Tuple;
Expand Down Expand Up @@ -53,6 +55,12 @@ impl Executor {

Insert::execute(table_id, input, self.storage.clone())
}
PhysicalPlan::Update(PhysicalUpdate { table_id, input, values}) => {
let input = self.build(*input);
let values = self.build(*values);

Update::execute(table_id, input, values, self.storage.clone())
}
PhysicalPlan::Values(op) => {
Values::execute(op)
}
Expand Down
3 changes: 3 additions & 0 deletions src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::execution::physical_plan::physical_limit::PhysicalLimit;
use crate::execution::physical_plan::physical_projection::PhysicalProjection;
use crate::execution::physical_plan::physical_sort::PhysicalSort;
use crate::execution::physical_plan::physical_table_scan::PhysicalTableScan;
use crate::execution::physical_plan::physical_update::PhysicalUpdate;
use crate::execution::physical_plan::physical_values::PhysicalValues;

pub(crate) mod physical_create_table;
Expand All @@ -18,10 +19,12 @@ pub(crate) mod physical_filter;
pub(crate) mod physical_sort;
pub(crate) mod physical_limit;
pub(crate) mod physical_hash_join;
pub(crate) mod physical_update;

#[derive(Debug)]
pub enum PhysicalPlan {
Insert(PhysicalInsert),
Update(PhysicalUpdate),
CreateTable(PhysicalCreateTable),
TableScan(PhysicalTableScan),
Projection(PhysicalProjection),
Expand Down
19 changes: 19 additions & 0 deletions src/execution/physical_plan/physical_plan_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::execution::physical_plan::physical_hash_join::PhysicalHashJoin;
use crate::execution::physical_plan::physical_insert::PhysicalInsert;
use crate::execution::physical_plan::physical_limit::PhysicalLimit;
use crate::execution::physical_plan::physical_sort::PhysicalSort;
use crate::execution::physical_plan::physical_update::PhysicalUpdate;
use crate::execution::physical_plan::physical_values::PhysicalValues;
use crate::planner::operator::create_table::CreateTableOperator;
use crate::planner::operator::filter::FilterOperator;
Expand All @@ -18,6 +19,7 @@ use crate::planner::operator::join::{JoinOperator, JoinType};
use crate::planner::operator::limit::LimitOperator;
use crate::planner::operator::project::ProjectOperator;
use crate::planner::operator::sort::SortOperator;
use crate::planner::operator::update::UpdateOperator;
use crate::planner::operator::values::ValuesOperator;

pub struct PhysicalPlanMapping;
Expand Down Expand Up @@ -65,6 +67,12 @@ impl PhysicalPlanMapping {

Self::build_physical_join(left_child, right_child, op)?
}
Operator::Update(op) => {
let input = plan.childrens.remove(0);
let values = plan.childrens.remove(0);

Self::build_physical_update(input, values, op)?
}
_ => return Err(MappingError::Unsupported(format!("{:?}", plan.operator))),
};

Expand Down Expand Up @@ -146,4 +154,15 @@ impl PhysicalPlanMapping {
}))
}
}

fn build_physical_update(input: LogicalPlan, values: LogicalPlan, op: UpdateOperator) -> Result<PhysicalPlan, MappingError> {
let input = Box::new(Self::build_plan(input)?);
let values = Box::new(Self::build_plan(values)?);

Ok(PhysicalPlan::Update(PhysicalUpdate {
table_id: op.table_id,
input,
values,
}))
}
}
9 changes: 9 additions & 0 deletions src/execution/physical_plan/physical_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::execution::physical_plan::PhysicalPlan;
use crate::types::TableId;

#[derive(Debug)]
pub struct PhysicalUpdate {
pub(crate) table_id: TableId,
pub(crate) input: Box<PhysicalPlan>,
pub(crate) values: Box<PhysicalPlan>
}
Loading

0 comments on commit 87f254c

Please sign in to comment.