Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
hash aggregate working for small inputs - test_filter_project_aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
yashkothari42 committed Apr 19, 2024
1 parent 21a364c commit e53c4bb
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 11 deletions.
27 changes: 27 additions & 0 deletions vayu/src/df2vayu.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use crate::operators::aggregate::AggregateOperator;
use crate::operators::filter::FilterOperator;
use crate::operators::join::HashProbeOperator;
use crate::operators::projection::ProjectionOperator;
use crate::Store;
use ahash::random_state::RandomSource;
use ahash::RandomState;
use arrow::array::BooleanBufferBuilder;
use arrow::compute::kernels::concat_elements;
use datafusion::datasource::physical_plan::CsvExec;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::aggregates::AggregateMode;
use datafusion::physical_plan::aggregates::StreamType;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::hash_join::BuildSide;
use datafusion::physical_plan::joins::hash_join::BuildSideReadyState;
Expand Down Expand Up @@ -41,6 +47,18 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32
sink: None,
};
}
if let Some(exec) = p.downcast_ref::<AggregateExec>() {
let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id);
// check if no group by present
if !exec.group_by().expr().is_empty() {
panic!("group by present- not handled");
}

let tt = Box::new(AggregateOperator::new(exec));
println!("adding aggregate");
pipeline.operators.push(tt);
return pipeline;
}
if let Some(exec) = p.downcast_ref::<FilterExec>() {
let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id);
let tt = Box::new(FilterOperator::new(exec.predicate().clone()));
Expand Down Expand Up @@ -87,6 +105,9 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32
if let Some(exec) = p.downcast_ref::<CoalesceBatchesExec>() {
return df2vayu(exec.input().clone(), store, pipeline_id);
}
if let Some(exec) = p.downcast_ref::<CoalescePartitionsExec>() {
return df2vayu(exec.input().clone(), store, pipeline_id);
}
panic!("should never reach the end");
}

Expand Down Expand Up @@ -138,6 +159,12 @@ pub fn get_source_node(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
if let Some(_) = p.downcast_ref::<ParquetExec>() {
return plan;
}
if let Some(exec) = p.downcast_ref::<AggregateExec>() {
return get_source_node(exec.input().clone());
}
if let Some(exec) = p.downcast_ref::<CoalescePartitionsExec>() {
return get_source_node(exec.input().clone());
}
if let Some(exec) = p.downcast_ref::<FilterExec>() {
return get_source_node(exec.input().clone());
}
Expand Down
2 changes: 1 addition & 1 deletion vayu/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl VayuExecutionEngine {
);
match sink {
vayu_common::SchedulerSinkType::PrintOutput => {
// pretty::print_batches(&result).unwrap();
pretty::print_batches(&result).unwrap();
}
// vayu_common::SchedulerSinkType::StoreRecordBatch(uuid) => {
// self.store.append(uuid, result);
Expand Down
79 changes: 79 additions & 0 deletions vayu/src/operators/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use arrow::datatypes::SchemaRef;
use datafusion::arrow::array::RecordBatch;
use datafusion::error::Result;
use datafusion::physical_plan::aggregates::aggregate_expressions;
use datafusion::physical_plan::aggregates::create_accumulators;
use datafusion::physical_plan::aggregates::finalize_aggregation;
use datafusion::physical_plan::aggregates::no_grouping::aggregate_batch;
use datafusion::physical_plan::aggregates::no_grouping::AggregateStream;
use datafusion::physical_plan::aggregates::AccumulatorItem;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::aggregates::AggregateMode;
use datafusion::physical_plan::aggregates::StreamType;
use datafusion::physical_plan::metrics::BaselineMetrics;
use datafusion::physical_plan::projection::batch_project;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::PhysicalExpr;
use std::sync::Arc;
use vayu_common::{IntermediateOperator, PhysicalOperator};
pub struct AggregateOperator {
schema: SchemaRef,
mode: AggregateMode,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
accumulators: Vec<AccumulatorItem>,
}
impl AggregateOperator {
pub fn new(agg: &AggregateExec) -> Self {
let agg_filter_expr = agg.filter_expr.clone();
let new_mode = *agg.mode();
println!("mode:{:?}", new_mode);

let aggregate_expressions =
aggregate_expressions(&agg.aggr_expr(), &agg.mode(), 0).unwrap();
let filter_expressions = match new_mode {
AggregateMode::Partial | AggregateMode::Single | AggregateMode::SinglePartitioned => {
agg_filter_expr
}
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr().len()]
}
};
let accumulators = create_accumulators(&agg.aggr_expr).unwrap();
AggregateOperator {
schema: Arc::clone(&agg.schema()),
mode: new_mode,
aggregate_expressions,
filter_expressions,
accumulators,
}
}
}

impl IntermediateOperator for AggregateOperator {
fn execute(&mut self, input: &RecordBatch) -> Result<RecordBatch> {
let result = aggregate_batch(
&self.mode,
input.clone(),
&mut self.accumulators,
&self.aggregate_expressions,
&self.filter_expressions,
);

// only finally
let result = finalize_aggregation(&mut self.accumulators, &self.mode);
// println!("{:?}", result);
// println!("{:?}", self.schema);
println!("mode {:?}", self.mode);
let result = result.and_then(|columns| {
RecordBatch::try_new(self.schema.clone(), columns).map_err(Into::into)
});
result
}
}

impl PhysicalOperator for AggregateOperator {
fn name(&self) -> String {
String::from("aggregate")
}
}
1 change: 1 addition & 0 deletions vayu/src/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod aggregate;
pub mod filter;
pub mod join;
pub mod projection;
Expand Down
4 changes: 2 additions & 2 deletions vayuDB/src/dummy_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vayu::operators::join;
use vayu_common::DatafusionPipelineWithSource;
use vayu_common::Task;

pub async fn test_filter_project() -> Result<Task> {
pub async fn test_filter_project_aggregate() -> Result<Task> {
// create local execution context
let ctx: SessionContext = SessionContext::new();
// register csv file with the execution context
Expand All @@ -23,7 +23,7 @@ pub async fn test_filter_project() -> Result<Task> {
CsvReadOptions::new(),
)
.await?;
let sql = "SELECT c1,c3 as neg,c4 as pos,c13 FROM aggregate_test_100 WHERE (c3 < 0 AND c1='a') OR ( c4 > 0 AND c1='b' ) ";
let sql = "SELECT count(c1),sum(c3),sum(c4),count(c13) FROM aggregate_test_100 WHERE (c3 < 0 AND c1='a') OR ( c4 > 0 AND c1='b' ) ";
let plan = get_execution_plan_from_sql(&ctx, sql).await?;
let source = df2vayu::get_source_node(plan.clone());
let mut task = Task::new();
Expand Down
10 changes: 7 additions & 3 deletions vayuDB/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::dummy_tasks::{test_filter_project, test_hash_join};
use crate::dummy_tasks::{test_filter_project_aggregate, test_hash_join};
use crate::tpch_tasks::test_tpchq1;
use datafusion_benchmarks::tpch;
use std::{hash::Hash, task::Poll};
Expand Down Expand Up @@ -27,7 +27,11 @@ impl Scheduler {
}

pub fn get_pipeline(&mut self, id: i32) -> Poll<vayu_common::DatafusionPipelineWithSource> {
let mut task = futures::executor::block_on(test_tpchq1()).unwrap();
// let mut task = futures::executor::block_on(test_tpchq1()).unwrap();
// let pipeline = task.pipelines.remove(0);
// return Poll::Ready(pipeline);

let mut task = futures::executor::block_on(test_filter_project_aggregate()).unwrap();
let pipeline = task.pipelines.remove(0);
return Poll::Ready(pipeline);

Expand All @@ -45,7 +49,7 @@ impl Scheduler {
let probe_pipeline = self.probe_pipeline.take().unwrap();
return Poll::Ready(probe_pipeline);
} else {
let mut task = futures::executor::block_on(test_filter_project()).unwrap();
let mut task = futures::executor::block_on(test_filter_project_aggregate()).unwrap();
let pipeline = task.pipelines.remove(0);
return Poll::Ready(pipeline);
// return Poll::Pending;
Expand Down
10 changes: 5 additions & 5 deletions vayuDB/src/tpch_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn test_tpchq1() -> Result<Task> {
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
batch_size: 81920,
debug: false,
};
let opt = RunOpt {
Expand All @@ -76,10 +76,10 @@ pub async fn test_tpchq1() -> Result<Task> {
let sql = queries.get(0).unwrap();

let plan = get_execution_plan_from_sql(&ctx, sql).await.unwrap();
// println!(
// "=== Physical plan ===\n{}\n",
// displayable(plan.as_ref()).indent(true)
// );
println!(
"=== Physical plan ===\n{}\n",
displayable(plan.as_ref()).indent(true)
);
let source = df2vayu::get_source_node(plan.clone());
let mut task = Task::new();

Expand Down

0 comments on commit e53c4bb

Please sign in to comment.