Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
aggregate-no-grouping
Browse files Browse the repository at this point in the history
  • Loading branch information
yashkothari42 committed Apr 22, 2024
1 parent f9658bb commit 8fc8185
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 61 deletions.
2 changes: 1 addition & 1 deletion vayu-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ pub enum SchedulerSourceType {
pub enum SchedulerSinkType {
StoreRecordBatch(i32),
// FinalAggregation(i32, AggregateOperator),
BuildAndStoreHashMap(i32, Arc<dyn ExecutionPlan>),
PrintOutput,
}

#[derive(Clone)]
pub enum FinalizeSinkType {
PrintFromStore(i32),
FinalAggregate(Arc<dyn ExecutionPlan>, i32),
BuildAndStoreHashMap(i32, Arc<dyn ExecutionPlan>),
}
#[derive(Clone)]
pub struct DatafusionPipeline {
Expand Down
82 changes: 47 additions & 35 deletions vayu-common/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,74 @@ use arrow::record_batch::RecordBatch;
use core::panic;
use datafusion::physical_plan::joins::hash_join::JoinLeftData;
use std::collections::HashMap;
#[derive(Clone)]
pub enum Blob {
RecordBatchBlob(Vec<RecordBatch>),
RecordBatchBlob(RecordBatch),
HashMapBlob(JoinLeftData),
}

impl Blob {
pub fn get_map(self) -> JoinLeftData {
match self {
Blob::HashMapBlob(m) => m,
_ => panic!("error"),
}
}
pub fn get_records(self) -> Vec<RecordBatch> {
match self {
Blob::RecordBatchBlob(records) => records,
_ => panic!("error"),
}
}
pub fn append_records(&mut self, batches: Vec<RecordBatch>) {
match self {
Blob::RecordBatchBlob(records) => {
// TODO: check if schema is same
records.extend(batches)
}
_ => panic!("error"),
}
}
}
// impl Blob {
// // pub fn get_map(self) -> JoinLeftData {
// // match self {
// // Blob::HashMapBlob(m) => m,
// // _ => panic!("error"),
// // }
// // }
// pub fn get_records(self) -> Vec<RecordBatch> {
// match self {
// Blob::RecordBatchBlob(records) => records,
// _ => panic!("error"),
// }
// }
// pub fn append_records(&mut self, batches: Vec<RecordBatch>) {
// match self {
// Blob::RecordBatchBlob(records) => {
// // TODO: check if schema is same
// records.extend(batches)
// }
// _ => panic!("error"),
// }
// }
// }

// right now this is typedef of HashMap<i32, Blob>,
// but we may need something else in near future
// store store a vector of blobs
// each blob would be output of one of the threads
// finalize step would remove the vec of blob and combine then store the result again

#[derive(Clone)]
pub struct Store {
pub store: HashMap<i32, Blob>,
pub store: HashMap<i32, Vec<Blob>>,
}
impl Store {
pub fn new() -> Store {
Store {
store: HashMap::new(),
}
}
pub fn insert(&mut self, key: i32, value: Blob) {
self.store.insert(key, value);
pub fn insert(&mut self, key: i32, mut value: Blob) {
let blob = self.store.get_mut(&key);
let mut blob = match blob {
Some(r) => r,
None => {
self.store.insert(key, vec![]);
self.store.get_mut(&key).unwrap()
}
};
blob.push(value);
}
pub fn append(&mut self, key: i32, value: Vec<RecordBatch>) {
pub fn append(&mut self, key: i32, mut value: Vec<Blob>) {
let blob = self.remove(key);
let mut blob = match blob {
Some(r) => r,
None => Blob::RecordBatchBlob(Vec::new()),
None => vec![],
};
blob.append_records(value);
blob.append(&mut value);
self.store.insert(key, blob);
}
pub fn remove(&mut self, key: i32) -> Option<Blob> {
pub fn remove(&mut self, key: i32) -> Option<Vec<Blob>> {
self.store.remove(&key)
// let x = self.store.remove(&key).unwrap().value();
// Some(x)
}
pub fn get(&mut self, key: i32) -> Option<&Vec<Blob>> {
self.store.get(&key)
}
}
43 changes: 37 additions & 6 deletions vayu/src/df2vayu.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::dummy;
use crate::operators::aggregate::AggregateOperator;
use crate::operators::filter::FilterOperator;
use crate::operators::join::HashProbeOperator;
Expand Down Expand Up @@ -27,7 +28,9 @@ use std::sync::Arc;
use vayu_common::VayuPipeline;

pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32) -> VayuPipeline {
let plan2 = plan.clone();
let p = plan.as_any();

let batch_size = 1024;
let config = SessionConfig::new().with_batch_size(batch_size);
let ctx = Arc::new(SessionContext::new_with_config(config));
Expand Down Expand Up @@ -76,25 +79,53 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32
return pipeline;
}
if let Some(exec) = p.downcast_ref::<HashJoinExec>() {
let mut exec2 = exec.clone();
// this function will only be called for probe side
// build side wont have hashjoinexec in make_pipeline call

// let dummy = exec.left().execute(0, context.clone());
let mut pipeline = df2vayu(exec.right().clone(), store, pipeline_id);
println!("adding hashprobe");

let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap();
println!("got joinstream");

let tt = dummy::DummyExec::new(
exec.properties().clone(),
exec.statistics().unwrap(),
exec.left().schema(),
);
let tt2 = dummy::DummyExec::new(
exec.properties().clone(),
exec.statistics().unwrap(),
exec.right().schema(),
);
let x = plan2
.with_new_children(vec![Arc::new(tt), Arc::new(tt2)])
.unwrap();
let x1 = x.as_any();
let exec = if let Some(exec) = x1.downcast_ref::<HashJoinExec>() {
exec
} else {
panic!("wrongg");
};
// using uuid but this value would be present in HashProbeExec itself
// TODO: remove from the correct key
let build_map = store.remove(42).unwrap();
let left_data = Arc::new(build_map.get_map());
println!("{:?}", store.store.keys());
let mut build_map = store.remove(42).unwrap();
let mut cmap = build_map.clone();
store.append(42, cmap);
let map = build_map.remove(0);
let build_map = match map {
vayu_common::store::Blob::HashMapBlob(map) => map,
_ => panic!("what nooo"),
};
let c = build_map.clone();
let left_data = Arc::new(build_map);
let visited_left_side = BooleanBufferBuilder::new(0);

let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap();
hashjoinstream.build_side = BuildSide::Ready(BuildSideReadyState {
left_data,
visited_left_side,
});
println!("got joinstream");
let tt = Box::new(HashProbeOperator::new(hashjoinstream));
pipeline.operators.push(tt);
return pipeline;
Expand Down
109 changes: 109 additions & 0 deletions vayu/src/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use datafusion::physical_plan::ExecutionPlan;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, vec};

use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion::common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::DisplayAs;
use datafusion::physical_plan::DisplayFormatType;
use datafusion::physical_plan::PlanProperties;
use datafusion::physical_plan::RecordBatchStream;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::Statistics;
use futures::stream::Stream;

use futures::{FutureExt, StreamExt};
#[derive(Debug)]
pub struct DummyExec {
cache: PlanProperties,
statistics: Statistics,
schema: Arc<Schema>,
}

impl DummyExec {
pub fn new(cache: PlanProperties, statistics: Statistics, schema: Arc<Schema>) -> Self {
DummyExec {
cache,
statistics,
schema,
}
}
}
impl ExecutionPlan for DummyExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.cache
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![]
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![]
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let stream = DummyStream {
schema: self.schema.clone(),
};
Ok(Box::pin(stream))
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

fn statistics(&self) -> Result<Statistics> {
Ok(self.statistics.clone())
}
}

impl DisplayAs for DummyExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "dummmyyy")?;
Ok(())
}
}
struct DummyStream {
schema: Arc<Schema>,
}
impl Stream for DummyStream {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
return Poll::Pending;
}
}

impl RecordBatchStream for DummyStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
Loading

0 comments on commit 8fc8185

Please sign in to comment.