Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
hash aggregate running on multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
yashkothari42 committed Apr 21, 2024
1 parent 8a2b3de commit f9658bb
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 202 deletions.
29 changes: 22 additions & 7 deletions vayu-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use arrow::record_batch::RecordBatch;
use datafusion::common::Result;
use datafusion::physical_plan::aggregates::AggregateMode;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use std::sync::Arc;
pub mod store;
// use vayu::operators::aggregate::AggregateOperator;

pub trait PhysicalOperator {
fn name(&self) -> String;
Expand All @@ -26,20 +28,28 @@ pub enum SchedulerSourceType {

#[derive(Clone)]
pub enum SchedulerSinkType {
// StoreRecordBatch(i32),
StoreRecordBatch(i32),
// FinalAggregation(i32, AggregateOperator),
BuildAndStoreHashMap(i32, Arc<dyn ExecutionPlan>),
PrintOutput,
}

#[derive(Clone)]
pub enum FinalizeSinkType {
PrintFromStore(i32),
FinalAggregate(Arc<dyn ExecutionPlan>, i32),
}
#[derive(Clone)]
pub struct DatafusionPipeline {
pub plan: Arc<dyn ExecutionPlan>,
pub sink: Option<SchedulerSinkType>,
pub id: i32,
}
pub struct DatafusionPipelineWithSource {
pub source: Arc<dyn ExecutionPlan>,
pub plan: Arc<dyn ExecutionPlan>,
pub sink: Option<SchedulerSinkType>,
#[derive(Clone)]
pub struct SchedulerPipeline {
pub source: Option<Arc<dyn ExecutionPlan>>,
pub pipeline: DatafusionPipeline,
pub finalize: FinalizeSinkType,
}

pub struct DatafusionPipelineWithData {
Expand All @@ -56,13 +66,18 @@ pub struct VayuPipelineWithData {
pub data: RecordBatch,
}
pub struct Task {
pub pipelines: Vec<DatafusionPipelineWithSource>,
pub pipelines: Vec<SchedulerPipeline>,
}

pub enum VayuMessage {
Normal(DatafusionPipelineWithData),
Finalize((FinalizeSinkType, i32)),
}
impl Task {
pub fn new() -> Self {
Task { pipelines: vec![] }
}
pub fn add_pipeline(&mut self, pipeline: DatafusionPipelineWithSource) {
pub fn add_pipeline(&mut self, pipeline: SchedulerPipeline) {
self.pipelines.push(pipeline);
}
}
53 changes: 27 additions & 26 deletions vayu-common/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use arrow::record_batch::RecordBatch;
use core::panic;
use datafusion::physical_plan::joins::hash_join::JoinLeftData;
use std::collections::HashMap;
pub enum Blob {
// RecordBatchBlob(Vec<RecordBatch>),
RecordBatchBlob(Vec<RecordBatch>),
HashMapBlob(JoinLeftData),
}

Expand All @@ -13,28 +14,28 @@ impl Blob {
_ => panic!("error"),
}
}
// pub fn get_records(self) -> Vec<RecordBatch> {
// match self {
// Blob::RecordBatchBlob(records) => records,
// _ => panic!("error"),
// }
// }
// pub fn append_records(&mut self, batches: Vec<RecordBatch>) {
// match self {
// Blob::RecordBatchBlob(records) => {
// // TODO: check if schema is same
// records.extend(batches)
// }
// _ => panic!("error"),
// }
// }
pub fn get_records(self) -> Vec<RecordBatch> {
match self {
Blob::RecordBatchBlob(records) => records,
_ => panic!("error"),
}
}
pub fn append_records(&mut self, batches: Vec<RecordBatch>) {
match self {
Blob::RecordBatchBlob(records) => {
// TODO: check if schema is same
records.extend(batches)
}
_ => panic!("error"),
}
}
}

// right now this is typedef of HashMap<i32, Blob>,
// but we may need something else in near future

pub struct Store {
store: HashMap<i32, Blob>,
pub store: HashMap<i32, Blob>,
}
impl Store {
pub fn new() -> Store {
Expand All @@ -45,15 +46,15 @@ impl Store {
pub fn insert(&mut self, key: i32, value: Blob) {
self.store.insert(key, value);
}
// pub fn append(&mut self, key: i32, value: Vec<RecordBatch>) {
// let blob = self.remove(key);
// let mut blob = match blob {
// Some(r) => r,
// None => Blob::RecordBatchBlob(Vec::new()),
// };
// blob.append_records(value);
// self.store.insert(key, blob);
// }
pub fn append(&mut self, key: i32, value: Vec<RecordBatch>) {
let blob = self.remove(key);
let mut blob = match blob {
Some(r) => r,
None => Blob::RecordBatchBlob(Vec::new()),
};
blob.append_records(value);
self.store.insert(key, blob);
}
pub fn remove(&mut self, key: i32) -> Option<Blob> {
self.store.remove(&key)
// let x = self.store.remove(&key).unwrap().value();
Expand Down
14 changes: 14 additions & 0 deletions vayu/src/df2vayu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,17 @@ pub fn get_source_node(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
}
panic!("No source node found");
}

pub fn aggregate(exec: Arc<dyn ExecutionPlan>) -> AggregateOperator {
let p = exec.as_any();
let final_aggregate = if let Some(exec) = p.downcast_ref::<AggregateExec>() {
if !exec.group_by().expr().is_empty() {
panic!("group by present- not handled");
}
let tt = AggregateOperator::new(exec);
tt
} else {
panic!("not an aggregate");
};
final_aggregate
}
53 changes: 43 additions & 10 deletions vayu/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use arrow::array::RecordBatch;
use arrow::util::pretty;
use vayu_common::DatafusionPipelineWithData;
use vayu_common::IntermediateOperator;
use vayu_common::VayuPipeline;
pub mod operators;
use crate::operators::aggregate::AggregateOperator;
use datafusion::physical_plan::coalesce_batches::concat_batches;
use std::sync::{Arc, Mutex};

pub mod sinks;
use vayu_common::store::Store;
pub mod df2vayu;
pub struct VayuExecutionEngine {
// this is per node store
pub store: Store,
// pub store: Store,
// this is global store
pub global_store: Arc<Mutex<Store>>,
// Note: only one of them will survive lets see which
Expand All @@ -19,10 +21,40 @@ pub struct VayuExecutionEngine {
impl VayuExecutionEngine {
pub fn new(global_store: Arc<Mutex<Store>>) -> VayuExecutionEngine {
VayuExecutionEngine {
store: Store::new(),
// store: Store::new(),
global_store,
}
}
pub fn finalize(&mut self, sink: vayu_common::FinalizeSinkType) {
println!("running finalize");

match sink {
vayu_common::FinalizeSinkType::PrintFromStore(uuid) => {
println!("running print from store {uuid}");
let mut store = self.global_store.lock().unwrap();
let blob = store.remove(uuid);
println!("{:?}", store.store.keys());

drop(store);
let result = blob.unwrap().get_records();
pretty::print_batches(&result).unwrap();
}
vayu_common::FinalizeSinkType::FinalAggregate(plan, uuid) => {
println!("running FinalAggregate from store {uuid}");
let mut store = self.global_store.lock().unwrap();
let blob = store.remove(uuid);
println!("{:?}", store.store.keys());

drop(store);
let result = blob.unwrap().get_records();
let mut operator = df2vayu::aggregate(plan);
let batch = arrow::compute::concat_batches(&result[0].schema(), &result).unwrap();

let result = operator.execute(&batch).unwrap();
pretty::print_batches(&[result.clone()]).unwrap();
}
}
}
pub fn sink(&mut self, sink: vayu_common::SchedulerSinkType, result: Vec<RecordBatch>) {
println!(
"runningsink size {}x{}",
Expand All @@ -33,17 +65,18 @@ impl VayuExecutionEngine {
vayu_common::SchedulerSinkType::PrintOutput => {
pretty::print_batches(&result).unwrap();
}
// vayu_common::SchedulerSinkType::StoreRecordBatch(uuid) => {
// self.store.append(uuid, result);
// }
vayu_common::SchedulerSinkType::StoreRecordBatch(uuid) => {
println!("storing at store {uuid}");
let mut store = self.global_store.lock().unwrap();
store.append(uuid, result);

println!("{:?}", store.store.keys());
drop(store);
}
vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node) => {
let mut sink = sinks::HashMapSink::new(uuid, join_node);
let hashmap = sink.build_map(result);
println!("BuildAndStoreHashMap storing in uuid {uuid}");

// old store
// self.store.insert(uuid, hashmap.unwrap());
// new store
let mut map = self.global_store.lock().unwrap();
map.insert(uuid, hashmap.unwrap());
}
Expand Down
2 changes: 1 addition & 1 deletion vayu/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl HashMapSink {
}
}
pub fn build_map(&mut self, result: Vec<RecordBatch>) -> Option<Blob> {
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let random_state: RandomState = RandomState::with_seeds(0, 0, 0, 0);
let ctx: SessionContext = SessionContext::new();
let reservation =
MemoryConsumer::new("HashJoinInput").register(ctx.task_ctx().memory_pool());
Expand Down
4 changes: 2 additions & 2 deletions vayuDB/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Scheduler will keep on sending the tasks.

## Common Vayu Structures
```
pub struct DatafusionPipelineWithSource {
pub struct SchedulerPipeline {
pub source: Arc<dyn ExecutionPlan>,
pub plan: Arc<dyn ExecutionPlan>,
pub sink: SchedulerSinkType,
Expand All @@ -60,7 +60,7 @@ pub struct DatafusionPipelineWithSource {
1. Scheduler
```
pub fn new() -> Self
pub fn get_pipeline(&mut self) -> Poll<vayu_common::DatafusionPipelineWithSource>
pub fn get_pipeline(&mut self) -> Poll<vayu_common::SchedulerPipeline>
pub fn ack_pipeline(&mut self, pipeline_id: i32);
```

Expand Down
Loading

0 comments on commit f9658bb

Please sign in to comment.