From 8fc81854a1539e633b997ade86093ffe1e869fb1 Mon Sep 17 00:00:00 2001 From: Yash Kothari Date: Mon, 22 Apr 2024 14:10:03 -0400 Subject: [PATCH] aggregate-no-grouping --- arrow-datafusion | 2 +- vayu-common/src/lib.rs | 2 +- vayu-common/src/store.rs | 82 ++++++++++++++++------------- vayu/src/df2vayu.rs | 43 ++++++++++++--- vayu/src/dummy.rs | 109 +++++++++++++++++++++++++++++++++++++++ vayu/src/lib.rs | 65 ++++++++++++++++++----- vayu/src/sinks.rs | 2 +- vayuDB/src/main.rs | 15 ++++++ vayuDB/src/scheduler.rs | 13 ++++- vayuDB/src/tpch_tasks.rs | 61 +++++++++++++++++++++- 10 files changed, 333 insertions(+), 61 deletions(-) create mode 100644 vayu/src/dummy.rs diff --git a/arrow-datafusion b/arrow-datafusion index 86927c2..b971600 160000 --- a/arrow-datafusion +++ b/arrow-datafusion @@ -1 +1 @@ -Subproject commit 86927c245d937df7e3b70008b8ae3ee9a61b315d +Subproject commit b9716007e187b5c2108f4800484d88c170f57cb8 diff --git a/vayu-common/src/lib.rs b/vayu-common/src/lib.rs index f6a458a..d20f708 100644 --- a/vayu-common/src/lib.rs +++ b/vayu-common/src/lib.rs @@ -30,7 +30,6 @@ pub enum SchedulerSourceType { pub enum SchedulerSinkType { StoreRecordBatch(i32), // FinalAggregation(i32, AggregateOperator), - BuildAndStoreHashMap(i32, Arc), PrintOutput, } @@ -38,6 +37,7 @@ pub enum SchedulerSinkType { pub enum FinalizeSinkType { PrintFromStore(i32), FinalAggregate(Arc, i32), + BuildAndStoreHashMap(i32, Arc), } #[derive(Clone)] pub struct DatafusionPipeline { diff --git a/vayu-common/src/store.rs b/vayu-common/src/store.rs index 04ec3a4..14cc9a5 100644 --- a/vayu-common/src/store.rs +++ b/vayu-common/src/store.rs @@ -2,40 +2,43 @@ use arrow::record_batch::RecordBatch; use core::panic; use datafusion::physical_plan::joins::hash_join::JoinLeftData; use std::collections::HashMap; +#[derive(Clone)] pub enum Blob { - RecordBatchBlob(Vec), + RecordBatchBlob(RecordBatch), HashMapBlob(JoinLeftData), } -impl Blob { - pub fn get_map(self) -> JoinLeftData { - match self { - Blob::HashMapBlob(m) => m, - _ => panic!("error"), - } - } - pub fn get_records(self) -> Vec { - match self { - Blob::RecordBatchBlob(records) => records, - _ => panic!("error"), - } - } - pub fn append_records(&mut self, batches: Vec) { - match self { - Blob::RecordBatchBlob(records) => { - // TODO: check if schema is same - records.extend(batches) - } - _ => panic!("error"), - } - } -} +// impl Blob { +// // pub fn get_map(self) -> JoinLeftData { +// // match self { +// // Blob::HashMapBlob(m) => m, +// // _ => panic!("error"), +// // } +// // } +// pub fn get_records(self) -> Vec { +// match self { +// Blob::RecordBatchBlob(records) => records, +// _ => panic!("error"), +// } +// } +// pub fn append_records(&mut self, batches: Vec) { +// match self { +// Blob::RecordBatchBlob(records) => { +// // TODO: check if schema is same +// records.extend(batches) +// } +// _ => panic!("error"), +// } +// } +// } -// right now this is typedef of HashMap, -// but we may need something else in near future +// store store a vector of blobs +// each blob would be output of one of the threads +// finalize step would remove the vec of blob and combine then store the result again +#[derive(Clone)] pub struct Store { - pub store: HashMap, + pub store: HashMap>, } impl Store { pub fn new() -> Store { @@ -43,21 +46,30 @@ impl Store { store: HashMap::new(), } } - pub fn insert(&mut self, key: i32, value: Blob) { - self.store.insert(key, value); + pub fn insert(&mut self, key: i32, mut value: Blob) { + let blob = self.store.get_mut(&key); + let mut blob = match blob { + Some(r) => r, + None => { + self.store.insert(key, vec![]); + self.store.get_mut(&key).unwrap() + } + }; + blob.push(value); } - pub fn append(&mut self, key: i32, value: Vec) { + pub fn append(&mut self, key: i32, mut value: Vec) { let blob = self.remove(key); let mut blob = match blob { Some(r) => r, - None => Blob::RecordBatchBlob(Vec::new()), + None => vec![], }; - blob.append_records(value); + blob.append(&mut value); self.store.insert(key, blob); } - pub fn remove(&mut self, key: i32) -> Option { + pub fn remove(&mut self, key: i32) -> Option> { self.store.remove(&key) - // let x = self.store.remove(&key).unwrap().value(); - // Some(x) + } + pub fn get(&mut self, key: i32) -> Option<&Vec> { + self.store.get(&key) } } diff --git a/vayu/src/df2vayu.rs b/vayu/src/df2vayu.rs index 4ac0f62..db90fd3 100644 --- a/vayu/src/df2vayu.rs +++ b/vayu/src/df2vayu.rs @@ -1,3 +1,4 @@ +use crate::dummy; use crate::operators::aggregate::AggregateOperator; use crate::operators::filter::FilterOperator; use crate::operators::join::HashProbeOperator; @@ -27,7 +28,9 @@ use std::sync::Arc; use vayu_common::VayuPipeline; pub fn df2vayu(plan: Arc, store: &mut Store, pipeline_id: i32) -> VayuPipeline { + let plan2 = plan.clone(); let p = plan.as_any(); + let batch_size = 1024; let config = SessionConfig::new().with_batch_size(batch_size); let ctx = Arc::new(SessionContext::new_with_config(config)); @@ -76,25 +79,53 @@ pub fn df2vayu(plan: Arc, store: &mut Store, pipeline_id: i32 return pipeline; } if let Some(exec) = p.downcast_ref::() { + let mut exec2 = exec.clone(); // this function will only be called for probe side // build side wont have hashjoinexec in make_pipeline call // let dummy = exec.left().execute(0, context.clone()); let mut pipeline = df2vayu(exec.right().clone(), store, pipeline_id); println!("adding hashprobe"); - - let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap(); - println!("got joinstream"); - + let tt = dummy::DummyExec::new( + exec.properties().clone(), + exec.statistics().unwrap(), + exec.left().schema(), + ); + let tt2 = dummy::DummyExec::new( + exec.properties().clone(), + exec.statistics().unwrap(), + exec.right().schema(), + ); + let x = plan2 + .with_new_children(vec![Arc::new(tt), Arc::new(tt2)]) + .unwrap(); + let x1 = x.as_any(); + let exec = if let Some(exec) = x1.downcast_ref::() { + exec + } else { + panic!("wrongg"); + }; // using uuid but this value would be present in HashProbeExec itself // TODO: remove from the correct key - let build_map = store.remove(42).unwrap(); - let left_data = Arc::new(build_map.get_map()); + println!("{:?}", store.store.keys()); + let mut build_map = store.remove(42).unwrap(); + let mut cmap = build_map.clone(); + store.append(42, cmap); + let map = build_map.remove(0); + let build_map = match map { + vayu_common::store::Blob::HashMapBlob(map) => map, + _ => panic!("what nooo"), + }; + let c = build_map.clone(); + let left_data = Arc::new(build_map); let visited_left_side = BooleanBufferBuilder::new(0); + + let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap(); hashjoinstream.build_side = BuildSide::Ready(BuildSideReadyState { left_data, visited_left_side, }); + println!("got joinstream"); let tt = Box::new(HashProbeOperator::new(hashjoinstream)); pipeline.operators.push(tt); return pipeline; diff --git a/vayu/src/dummy.rs b/vayu/src/dummy.rs new file mode 100644 index 0000000..7b686bf --- /dev/null +++ b/vayu/src/dummy.rs @@ -0,0 +1,109 @@ +use datafusion::physical_plan::ExecutionPlan; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{any::Any, vec}; + +use arrow::array::{ArrayRef, UInt64Builder}; +use arrow::datatypes::Schema; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion::common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::DisplayAs; +use datafusion::physical_plan::DisplayFormatType; +use datafusion::physical_plan::PlanProperties; +use datafusion::physical_plan::RecordBatchStream; +use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion::physical_plan::Statistics; +use futures::stream::Stream; + +use futures::{FutureExt, StreamExt}; +#[derive(Debug)] +pub struct DummyExec { + cache: PlanProperties, + statistics: Statistics, + schema: Arc, +} + +impl DummyExec { + pub fn new(cache: PlanProperties, statistics: Statistics, schema: Arc) -> Self { + DummyExec { + cache, + statistics, + schema, + } + } +} +impl ExecutionPlan for DummyExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + Ok(self) + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![] + } + + fn maintains_input_order(&self) -> Vec { + vec![] + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let stream = DummyStream { + schema: self.schema.clone(), + }; + Ok(Box::pin(stream)) + } + + fn metrics(&self) -> Option { + None + } + + fn statistics(&self) -> Result { + Ok(self.statistics.clone()) + } +} + +impl DisplayAs for DummyExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "dummmyyy")?; + Ok(()) + } +} +struct DummyStream { + schema: Arc, +} +impl Stream for DummyStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + return Poll::Pending; + } +} + +impl RecordBatchStream for DummyStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} diff --git a/vayu/src/lib.rs b/vayu/src/lib.rs index 25aed4b..6195555 100644 --- a/vayu/src/lib.rs +++ b/vayu/src/lib.rs @@ -10,6 +10,7 @@ use std::sync::{Arc, Mutex}; pub mod sinks; use vayu_common::store::Store; pub mod df2vayu; +mod dummy; pub struct VayuExecutionEngine { // this is per node store // pub store: Store, @@ -18,6 +19,27 @@ pub struct VayuExecutionEngine { // Note: only one of them will survive lets see which } +fn get_batches_from_record_batch_blob(vblob: Vec) -> Vec { + let mut batches = vec![]; + for val in vblob { + match val { + vayu_common::store::Blob::RecordBatchBlob(batch) => { + batches.push(batch); + } + vayu_common::store::Blob::HashMapBlob(_) => { + panic!("not done") + } + } + } + batches +} +fn get_record_batch_blob_from_batches(batch: Vec) -> Vec { + let mut vblob = vec![]; + for val in batch { + vblob.push(vayu_common::store::Blob::RecordBatchBlob(val)); + } + vblob +} impl VayuExecutionEngine { pub fn new(global_store: Arc>) -> VayuExecutionEngine { VayuExecutionEngine { @@ -32,27 +54,48 @@ impl VayuExecutionEngine { vayu_common::FinalizeSinkType::PrintFromStore(uuid) => { println!("running print from store {uuid}"); let mut store = self.global_store.lock().unwrap(); - let blob = store.remove(uuid); println!("{:?}", store.store.keys()); + let blob = store.remove(uuid); + drop(store); - let result = blob.unwrap().get_records(); - pretty::print_batches(&result).unwrap(); + let result = blob.unwrap(); + let batches = get_batches_from_record_batch_blob(result); + // pretty::print_batches(&batches).unwrap(); } vayu_common::FinalizeSinkType::FinalAggregate(plan, uuid) => { println!("running FinalAggregate from store {uuid}"); let mut store = self.global_store.lock().unwrap(); - let blob = store.remove(uuid); println!("{:?}", store.store.keys()); + let blob = store.remove(uuid); + drop(store); - let result = blob.unwrap().get_records(); + let result = blob.unwrap(); + let batches = get_batches_from_record_batch_blob(result); + let mut operator = df2vayu::aggregate(plan); - let batch = arrow::compute::concat_batches(&result[0].schema(), &result).unwrap(); + let batch = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); let result = operator.execute(&batch).unwrap(); pretty::print_batches(&[result.clone()]).unwrap(); } + vayu_common::FinalizeSinkType::BuildAndStoreHashMap(uuid, join_node) => { + let mut sink = sinks::HashMapSink::new(uuid, join_node); + + let mut store = self.global_store.lock().unwrap(); + println!("{:?}", store.store.keys()); + + let blob = store.remove(uuid).unwrap(); + drop(store); + let batches = get_batches_from_record_batch_blob(blob); + + let hashmap = sink.build_map(batches); + let mut store = self.global_store.lock().unwrap(); + store.insert(uuid, hashmap.unwrap()); + drop(store); + println!("storing the map {uuid}"); + } } } pub fn sink(&mut self, sink: vayu_common::SchedulerSinkType, result: Vec) { @@ -68,18 +111,12 @@ impl VayuExecutionEngine { vayu_common::SchedulerSinkType::StoreRecordBatch(uuid) => { println!("storing at store {uuid}"); let mut store = self.global_store.lock().unwrap(); - store.append(uuid, result); + let t = get_record_batch_blob_from_batches(result); + store.append(uuid, t); println!("{:?}", store.store.keys()); drop(store); } - vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node) => { - let mut sink = sinks::HashMapSink::new(uuid, join_node); - let hashmap = sink.build_map(result); - println!("BuildAndStoreHashMap storing in uuid {uuid}"); - let mut map = self.global_store.lock().unwrap(); - map.insert(uuid, hashmap.unwrap()); - } }; } pub fn execute(&mut self, pipeline: DatafusionPipelineWithData) { diff --git a/vayu/src/sinks.rs b/vayu/src/sinks.rs index f3daa99..a651241 100644 --- a/vayu/src/sinks.rs +++ b/vayu/src/sinks.rs @@ -39,7 +39,7 @@ impl HashMapSink { let reservation = MemoryConsumer::new("HashJoinInput").register(ctx.task_ctx().memory_pool()); - let hash_map = hash_join::create_hash_build_map( + let hash_map: hash_join::JoinLeftData = hash_join::create_hash_build_map( result, random_state, self.on_left.clone(), diff --git a/vayuDB/src/main.rs b/vayuDB/src/main.rs index 610c04f..1e4f053 100644 --- a/vayuDB/src/main.rs +++ b/vayuDB/src/main.rs @@ -138,6 +138,21 @@ fn main() { } None => { println!("inform scheduler we are done"); + // let pipeline = scheduler.get_pipeline(next_id); + // if let Poll::Ready(mut pipeline) = pipeline { + // // TODO: add support for multiple dependent pipeline + // println!("got a pipeline from scheduler"); + + // let source = pipeline.source.take().unwrap(); + // // submit the source request to io service + // let request_num = io_service.submit_request(source); + // println!("sent the request to the io_service"); + + // // insert the pipeline into the local map + // request_pipeline_map.insert(request_num, (pipeline, 0)); + // next_id += 1; + // } + // scheduler.ack_pipeline(request_num); } } diff --git a/vayuDB/src/scheduler.rs b/vayuDB/src/scheduler.rs index 7b7d35b..2fb2b9a 100644 --- a/vayuDB/src/scheduler.rs +++ b/vayuDB/src/scheduler.rs @@ -1,5 +1,5 @@ // use crate::dummy_tasks::test_hash_join; -use crate::tpch_tasks::test_tpchq1; +use crate::tpch_tasks::{test_tpchq1, test_tpchq2}; use datafusion_benchmarks::tpch; use std::{hash::Hash, task::Poll}; use vayu_common::SchedulerPipeline; @@ -31,6 +31,17 @@ impl Scheduler { let pipeline = task.pipelines.remove(0); return Poll::Ready(pipeline); + // if self.probe_pipeline.is_none() { + // let mut task = futures::executor::block_on(test_tpchq2()).unwrap(); + // let pipeline = task.pipelines.remove(0); + // self.probe_pipeline = Some(task.pipelines.remove(0)); + // self.state = HashJoinState::BuildSent(id); + // return Poll::Ready(pipeline); + // } else { + // let pipeline = self.probe_pipeline.take(); + // return Poll::Ready(pipeline.unwrap()); + // } + // let mut task = futures::executor::block_on(test_filter_project_aggregate()).unwrap(); // let pipeline = task.pipelines.remove(0); // return Poll::Ready(pipeline); diff --git a/vayuDB/src/tpch_tasks.rs b/vayuDB/src/tpch_tasks.rs index 62a47ed..7c3164a 100644 --- a/vayuDB/src/tpch_tasks.rs +++ b/vayuDB/src/tpch_tasks.rs @@ -55,7 +55,7 @@ pub fn get_query_sql(query: usize) -> Result> { } } -pub async fn test_tpchq1() -> Result { +pub async fn tpch_common() -> Result { let ctx = SessionContext::default(); let path = get_tpch_data_path()?; let common = CommonOpt { @@ -74,13 +74,17 @@ pub async fn test_tpchq1() -> Result { disable_statistics: false, }; opt.register_tables(&ctx).await.unwrap(); + return Ok(ctx); +} +pub async fn test_tpchq1() -> Result { + // this is aggregate + let ctx = tpch_common().await.unwrap(); let queries = get_query_sql(1).unwrap(); // println!("{:?}", queries); let sql = queries.get(0).unwrap(); let plan = get_execution_plan_from_sql(&ctx, sql).await.unwrap(); let final_aggregate = plan.clone(); - // let final_aggregate = AggregateOperator::new(final_aggregate); let plan = plan.children().get(0).unwrap().clone(); println!( @@ -88,6 +92,7 @@ pub async fn test_tpchq1() -> Result { displayable(plan.as_ref()).indent(true) ); let source = Some(df2vayu::get_source_node(plan.clone())); + let mut task = Task::new(); let uuid = 55; @@ -108,3 +113,55 @@ pub async fn test_tpchq1() -> Result { return Ok(task); } + +pub async fn test_tpchq2() -> Result { + // this is join + let ctx = tpch_common().await.unwrap(); + let queries = get_query_sql(2).unwrap(); + // println!("{:?}", queries); + let sql = queries.get(0).unwrap(); + + let plan = get_execution_plan_from_sql(&ctx, sql).await.unwrap(); + + let plan = plan.children().get(0).unwrap().clone(); + println!( + "=== Physical plan ===\n{}\n", + displayable(plan.as_ref()).indent(true) + ); + + let uuid = 42; + let mut task = Task::new(); + + let (join_node, build_plan) = df2vayu::get_hash_build_pipeline(plan.clone(), uuid); + + let build_source_pipeline = Some(df2vayu::get_source_node(build_plan.clone())); + let build_pipeline = DatafusionPipeline { + plan: build_plan, + sink: Some(SchedulerSinkType::StoreRecordBatch(uuid)), + id: 1, + }; + let build_pipeline = SchedulerPipeline { + source: build_source_pipeline, + pipeline: build_pipeline, + finalize: vayu_common::FinalizeSinkType::BuildAndStoreHashMap(uuid, join_node), + }; + task.add_pipeline(build_pipeline); + let uuid2 = 98; + // TODO: set this uuid in probe also + let probe_plan = plan.clone(); + let probe_source_node = Some(df2vayu::get_source_node(probe_plan.clone())); + let probe_pipeline = DatafusionPipeline { + plan: probe_plan, + sink: Some(SchedulerSinkType::StoreRecordBatch(uuid2)), + id: 1, + }; + + let probe_pipeline = SchedulerPipeline { + source: probe_source_node, + pipeline: probe_pipeline, + finalize: vayu_common::FinalizeSinkType::PrintFromStore(uuid2), + }; + task.add_pipeline(probe_pipeline); + + return Ok(task); +}