Skip to content

Commit

Permalink
test with vec table
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 5, 2024
1 parent e429fdd commit 043b8c7
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 153 deletions.
6 changes: 4 additions & 2 deletions src/daft-local-execution/src/intermediate_ops/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_table::Table;
use tracing::instrument;

use super::intermediate_op::{
Expand Down Expand Up @@ -31,9 +32,10 @@ impl IntermediateOperator for AggregateOperator {
input: &PipelineResultType,
_state: Option<&mut Box<dyn IntermediateOperatorState>>,
) -> DaftResult<IntermediateOperatorResult> {
let out = input.as_data().agg(&self.agg_exprs, &self.group_by)?;
let input = Table::concat(input.as_data())?;
let out = input.agg(&self.agg_exprs, &self.group_by)?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(
out,
vec![out; 1],
))))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_core::{
prelude::{
bitmap::{Bitmap, MutableBitmap},
BooleanArray,
},
series::IntoSeries,
};
use daft_dsl::ExprRef;
use daft_plan::JoinType;
use daft_table::{Probeable, Table};
use daft_table::{GrowableTable, Probeable, Table};
use tracing::{info_span, instrument};

use super::intermediate_op::{
Expand Down Expand Up @@ -60,31 +53,36 @@ impl AntiSemiProbeOperator {
}
}

fn probe_anti_semi(&self, input: &Table, state: &mut AntiSemiProbeState) -> DaftResult<Table> {
fn probe_anti_semi(
&self,
input: &[Table],
state: &mut AntiSemiProbeState,
) -> DaftResult<Table> {
let probe_set = state.get_probeable();

let _growables = info_span!("AntiSemiOperator::build_growables").entered();

let mut input_idx_matches = MutableBitmap::from_len_zeroed(input.len());
let mut probe_side_growable =
GrowableTable::new(&input.iter().collect::<Vec<_>>(), false, 20)?;

drop(_growables);
{
let _loop = info_span!("AntiSemiOperator::eval_and_probe").entered();
let join_keys = input.eval_expression_list(&self.probe_on)?;
let iter = probe_set.probe_exists(&join_keys)?;

for (probe_row_idx, matched) in iter.enumerate() {
match (self.join_type == JoinType::Semi, matched) {
(true, true) | (false, false) => {
input_idx_matches.set(probe_row_idx, true);
for (probe_side_table_idx, table) in input.iter().enumerate() {
let join_keys = table.eval_expression_list(&self.probe_on)?;
let iter = probe_set.probe_exists(&join_keys)?;

for (probe_row_idx, matched) in iter.enumerate() {
match (self.join_type == JoinType::Semi, matched) {
(true, true) | (false, false) => {
probe_side_growable.extend(probe_side_table_idx, probe_row_idx, 1);
}
_ => {}
}
_ => {}
}
}
}
let bitmap: Bitmap = input_idx_matches.into();
let result = input.mask_filter(&BooleanArray::from(("bitmap", bitmap)).into_series())?;
Ok(result)
probe_side_growable.build()
}
}

Expand Down Expand Up @@ -119,7 +117,7 @@ impl IntermediateOperator for AntiSemiProbeOperator {
_ => unreachable!("Only Semi and Anti joins are supported"),
}?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(
out,
vec![out; 1],
))))
}
}
Expand Down
37 changes: 19 additions & 18 deletions src/daft-local-execution/src/intermediate_ops/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{cmp::Ordering::*, collections::VecDeque, sync::Arc};
use std::{cmp::Ordering::*, collections::VecDeque};

use common_error::DaftResult;
use daft_table::Table;

pub struct OperatorBuffer {
pub buffer: VecDeque<Arc<Table>>,
pub buffer: VecDeque<Table>,
pub curr_len: usize,
pub threshold: usize,
}
Expand All @@ -19,22 +19,23 @@ impl OperatorBuffer {
}
}

pub fn push(&mut self, part: Arc<Table>) {
self.curr_len += part.len();
self.buffer.push_back(part);
pub fn push(&mut self, parts: &[Table]) {
for part in parts {
self.buffer.push_back(part.clone());
self.curr_len += part.len();
}
}

pub fn try_clear(&mut self) -> Option<DaftResult<Arc<Table>>> {
pub fn try_clear(&mut self) -> Option<DaftResult<Vec<Table>>> {
match self.curr_len.cmp(&self.threshold) {
Less => None,
Equal => self.clear_all(),
Greater => Some(self.clear_enough()),
}
}

fn clear_enough(&mut self) -> DaftResult<Arc<Table>> {
fn clear_enough(&mut self) -> DaftResult<Vec<Table>> {
assert!(self.curr_len > self.threshold);

let mut to_concat = Vec::with_capacity(self.buffer.len());
let mut remaining = self.threshold;

Expand All @@ -47,28 +48,28 @@ impl OperatorBuffer {
} else {
let (head, tail) = part.split_at(remaining)?;
remaining = 0;
to_concat.push(Arc::new(head));
self.buffer.push_front(Arc::new(tail));
to_concat.push(head);
self.buffer.push_front(tail);
break;
}
}
assert_eq!(remaining, 0);

self.curr_len -= self.threshold;
match to_concat.len() {
1 => Ok(to_concat.pop().unwrap()),
_ => Ok(Arc::new(Table::concat(&to_concat)?)),
}
Ok(to_concat)
}

pub fn clear_all(&mut self) -> Option<DaftResult<Arc<Table>>> {
pub fn clear_all(&mut self) -> Option<DaftResult<Vec<Table>>> {
if self.buffer.is_empty() {
return None;
}

let concated = Table::concat(&std::mem::take(&mut self.buffer).iter().collect::<Vec<_>>())
.map(Arc::new);
self.curr_len = 0;
Some(concated)
Some(
std::mem::take(&mut self.buffer)
.into_iter()
.map(Ok)
.collect(),
)
}
}
7 changes: 6 additions & 1 deletion src/daft-local-execution/src/intermediate_ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_table::Table;
use tracing::instrument;

use super::intermediate_op::{
Expand All @@ -27,7 +28,11 @@ impl IntermediateOperator for FilterOperator {
input: &PipelineResultType,
_state: Option<&mut Box<dyn IntermediateOperatorState>>,
) -> DaftResult<IntermediateOperatorResult> {
let out = input.as_data().filter(&[self.predicate.clone()])?;
let out = input
.as_data()
.iter()
.map(|t| t.filter(&[self.predicate.clone()]))
.collect::<DaftResult<Vec<Table>>>()?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(
out,
))))
Expand Down
104 changes: 54 additions & 50 deletions src/daft-local-execution/src/intermediate_ops/hash_join_probe.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_core::{
prelude::{SchemaRef, UInt64Array},
series::IntoSeries,
};
use daft_core::prelude::SchemaRef;
use daft_dsl::ExprRef;
use daft_plan::JoinType;
use daft_table::{GrowableTable, Probeable, Table};
Expand Down Expand Up @@ -97,38 +94,42 @@ impl HashJoinProbeOperator {
}
}

fn probe_inner(&self, input: &Table, state: &mut HashJoinProbeState) -> DaftResult<Table> {
fn probe_inner(&self, input: &[Table], state: &mut HashJoinProbeState) -> DaftResult<Table> {
let (probe_table, tables) = state.get_probeable_and_table();

let _growables = info_span!("HashJoinOperator::build_growables").entered();

let mut build_side_growable =
GrowableTable::new(&tables.iter().collect::<Vec<_>>(), false, 20)?;

let mut input_idx_matches = vec![];
let mut probe_side_growable =
GrowableTable::new(&input.iter().collect::<Vec<_>>(), false, 20)?;

drop(_growables);
{
let _loop = info_span!("HashJoinOperator::eval_and_probe").entered();
let join_keys = input.eval_expression_list(&self.probe_on)?;
let idx_mapper = probe_table.probe_indices(&join_keys)?;

for (probe_row_idx, inner_iter) in idx_mapper.make_iter().enumerate() {
if let Some(inner_iter) = inner_iter {
for (build_side_table_idx, build_row_idx) in inner_iter {
build_side_growable.extend(
build_side_table_idx as usize,
build_row_idx as usize,
1,
);
input_idx_matches.push(probe_row_idx as u64);
for (probe_side_table_idx, table) in input.iter().enumerate() {
// we should emit one table at a time when this is streaming
let join_keys = table.eval_expression_list(&self.probe_on)?;
let idx_mapper = probe_table.probe_indices(&join_keys)?;

for (probe_row_idx, inner_iter) in idx_mapper.make_iter().enumerate() {
if let Some(inner_iter) = inner_iter {
for (build_side_table_idx, build_row_idx) in inner_iter {
build_side_growable.extend(
build_side_table_idx as usize,
build_row_idx as usize,
1,
);
// we can perform run length compression for this to make this more efficient
probe_side_growable.extend(probe_side_table_idx, probe_row_idx, 1);
}
}
}
}
}
let build_side_table = build_side_growable.build()?;
let probe_side_table =
input.take(&UInt64Array::from(("matches", input_idx_matches)).into_series())?;
let probe_side_table = probe_side_growable.build()?;

let (left_table, right_table) = if self.build_on_left {
(build_side_table, probe_side_table)
Expand All @@ -139,14 +140,16 @@ impl HashJoinProbeOperator {
let join_keys_table = left_table.get_columns(&self.common_join_keys)?;
let left_non_join_columns = left_table.get_columns(&self.left_non_join_columns)?;
let right_non_join_columns = right_table.get_columns(&self.right_non_join_columns)?;
let final_table = join_keys_table
join_keys_table
.union(&left_non_join_columns)?
.union(&right_non_join_columns)?;

Ok(final_table)
.union(&right_non_join_columns)
}

fn probe_left_right(&self, input: &Table, state: &mut HashJoinProbeState) -> DaftResult<Table> {
fn probe_left_right(
&self,
input: &[Table],
state: &mut HashJoinProbeState,
) -> DaftResult<Table> {
let (probe_table, tables) = state.get_probeable_and_table();

let _growables = info_span!("HashJoinOperator::build_growables").entered();
Expand All @@ -157,47 +160,48 @@ impl HashJoinProbeOperator {
tables.iter().map(|t| t.len()).sum(),
)?;

let mut input_idx_matches = Vec::with_capacity(input.len());
let mut probe_side_growable =
GrowableTable::new(&input.iter().collect::<Vec<_>>(), false, input.len())?;

drop(_growables);
{
let _loop = info_span!("HashJoinOperator::eval_and_probe").entered();
let join_keys = input.eval_expression_list(&self.probe_on)?;
let idx_mapper = probe_table.probe_indices(&join_keys)?;

for (probe_row_idx, inner_iter) in idx_mapper.make_iter().enumerate() {
if let Some(inner_iter) = inner_iter {
for (build_side_table_idx, build_row_idx) in inner_iter {
build_side_growable.extend(
build_side_table_idx as usize,
build_row_idx as usize,
1,
);
input_idx_matches.push(probe_row_idx as u64);
for (probe_side_table_idx, table) in input.iter().enumerate() {
let join_keys = table.eval_expression_list(&self.probe_on)?;
let idx_mapper = probe_table.probe_indices(&join_keys)?;

for (probe_row_idx, inner_iter) in idx_mapper.make_iter().enumerate() {
if let Some(inner_iter) = inner_iter {
for (build_side_table_idx, build_row_idx) in inner_iter {
build_side_growable.extend(
build_side_table_idx as usize,
build_row_idx as usize,
1,
);
probe_side_growable.extend(probe_side_table_idx, probe_row_idx, 1);
}
} else {
// if there's no match, we should still emit the probe side and fill the build side with nulls
build_side_growable.add_nulls(1);
probe_side_growable.extend(probe_side_table_idx, probe_row_idx, 1);
}
} else {
// if there's no match, we should still emit the probe side and fill the build side with nulls
build_side_growable.add_nulls(1);
input_idx_matches.push(probe_row_idx as u64);
}
}
}
let build_side_table = build_side_growable.build()?;
let probe_side_table =
input.take(&UInt64Array::from(("matches", input_idx_matches)).into_series())?;
let probe_side_table = probe_side_growable.build()?;

let final_table = if self.join_type == JoinType::Left {
if self.join_type == JoinType::Left {
let join_table = probe_side_table.get_columns(&self.common_join_keys)?;
let left = probe_side_table.get_columns(&self.left_non_join_columns)?;
let right = build_side_table.get_columns(&self.right_non_join_columns)?;
join_table.union(&left)?.union(&right)?
join_table.union(&left)?.union(&right)
} else {
let join_table = probe_side_table.get_columns(&self.common_join_keys)?;
let left = build_side_table.get_columns(&self.left_non_join_columns)?;
let right = probe_side_table.get_columns(&self.right_non_join_columns)?;
join_table.union(&left)?.union(&right)?
};
Ok(final_table)
join_table.union(&left)?.union(&right)
}
}
}

Expand Down Expand Up @@ -235,7 +239,7 @@ impl IntermediateOperator for HashJoinProbeOperator {
}
}?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(
out,
vec![out; 1],
))))
}
}
Expand Down
Loading

0 comments on commit 043b8c7

Please sign in to comment.