Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
multithreaded working
Browse files Browse the repository at this point in the history
  • Loading branch information
yashkothari42 committed Apr 2, 2024
1 parent e724f92 commit 68a0f63
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 110 deletions.
2 changes: 2 additions & 0 deletions vayu-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ edition = "2021"
[dependencies]
datafusion = { version = "36.0.0", path = "../arrow-datafusion/datafusion/core"}
arrow = {}
crossbeam-utils = "0.8.19"
crossbeam-skiplist = "0.1.3"
2 changes: 2 additions & 0 deletions vayu-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use arrow::record_batch::RecordBatch;
use datafusion::common::Result;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use std::sync::Arc;
pub mod store;

pub trait PhysicalOperator {
fn name(&self) -> String;
}
Expand Down
96 changes: 96 additions & 0 deletions vayu-common/src/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use arrow::array::RecordBatch;
use crossbeam_skiplist::SkipMap;
use crossbeam_utils::thread::scope;
use datafusion::physical_plan::joins::hash_join::JoinLeftData;

use core::panic;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
pub enum Blob {
// RecordBatchBlob(Vec<RecordBatch>),
HashMapBlob(JoinLeftData),
}

impl Blob {
pub fn get_map(self) -> JoinLeftData {
match self {
Blob::HashMapBlob(m) => m,
_ => panic!("error"),
}
}
// pub fn get_records(self) -> Vec<RecordBatch> {
// match self {
// Blob::RecordBatchBlob(records) => records,
// _ => panic!("error"),
// }
// }
// pub fn append_records(&mut self, batches: Vec<RecordBatch>) {
// match self {
// Blob::RecordBatchBlob(records) => {
// // TODO: check if schema is same
// records.extend(batches)
// }
// _ => panic!("error"),
// }
// }
}

// right now this is typedef of HashMap<i32, Blob>,
// but we may need something else in near future

pub struct Store {
store: HashMap<i32, Blob>,
}
impl Store {
pub fn new() -> Store {
Store {
store: HashMap::new(),
}
}
pub fn insert(&mut self, key: i32, value: Blob) {
self.store.insert(key, value);
}
// pub fn append(&mut self, key: i32, value: Vec<RecordBatch>) {
// let blob = self.remove(key);
// let mut blob = match blob {
// Some(r) => r,
// None => Blob::RecordBatchBlob(Vec::new()),
// };
// blob.append_records(value);
// self.store.insert(key, blob);
// }
pub fn remove(&mut self, key: i32) -> Option<Blob> {
self.store.remove(&key)
// let x = self.store.remove(&key).unwrap().value();
// Some(x)
}
}
pub struct Store1 {
store: SkipMap<i32, i32>,
}
impl Store1 {
pub fn new() -> Self {
Store1 {
store: SkipMap::new(),
}
}
pub fn insert(&mut self, key: i32, value: i32) {
self.store.insert(key, value);
}
// pub fn append(&mut self, key: i32, value: Vec<RecordBatch>) {
// let blob = self.remove(key);
// let mut blob = match blob {
// Some(r) => r,
// None => Blob::RecordBatchBlob(Vec::new()),
// };
// blob.append_records(value);
// self.store.insert(key, blob);
// }
// pub fn remove(&mut self, key: i32) -> &Option<Arc<Blob>> {
// let value = self.store.remove(&key).unwrap().value();
// Some(value.un)
// // let x = self.store.remove(&key).unwrap().value();
// // Some(x)
// }
}
4 changes: 4 additions & 0 deletions vayu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ futures = { version = "0.3.28" }
datafusion = { version = "36.0.0", path = "../arrow-datafusion/datafusion/core"}
vayu-common = { version = "0.1.0", path = "../vayu-common"}
ahash = "0.8.11"
crossbeam-skiplist = "0.1.3"
crossbeam-utils = "0.8.19"
leapfrog = {version = "0.3.0", features = ["stable_alloc"]}

27 changes: 21 additions & 6 deletions vayu/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ use vayu_common::DatafusionPipelineWithData;
use vayu_common::VayuPipeline;
pub mod operators;
pub mod pipeline;
use std::sync::{Arc, Mutex};

pub mod sinks;
pub mod store;
use crate::store::Store;
use vayu_common::store::Store;
pub mod df2vayu;
pub struct VayuExecutionEngine {
// this is per node store
pub store: Store,
// this is global store
pub global_store: Arc<Mutex<Store>>,
// Note: only one of them will survive lets see which
}

impl VayuExecutionEngine {
pub fn new() -> VayuExecutionEngine {
pub fn new(global_store: Arc<Mutex<Store>>) -> VayuExecutionEngine {
VayuExecutionEngine {
store: Store::new(),
global_store,
}
}
pub fn sink(&mut self, sink: vayu_common::SchedulerSinkType, result: Vec<RecordBatch>) {
Expand All @@ -33,16 +39,25 @@ impl VayuExecutionEngine {
// }
vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node) => {
let mut sink = sinks::HashMapSink::new(uuid, join_node);
let map = sink.build_map(result);
let hashmap = sink.build_map(result);
println!("BuildAndStoreHashMap storing in uuid {uuid}");
self.store.insert(uuid, map.unwrap());

// old store
// self.store.insert(uuid, hashmap.unwrap());
// new store
let mut map = self.global_store.lock().unwrap();
map.insert(uuid, hashmap.unwrap());
}
};
}
pub fn execute(&mut self, pipeline: DatafusionPipelineWithData) {
let data = pipeline.data;
let sink = pipeline.pipeline.sink;
let mut pipeline: VayuPipeline = df2vayu::df2vayu(pipeline.pipeline.plan, &mut self.store);

let mut store = self.global_store.lock().unwrap();
let mut pipeline: VayuPipeline = df2vayu::df2vayu(pipeline.pipeline.plan, &mut store);
drop(store);

pipeline.sink = sink;

self.execute_internal(pipeline, data);
Expand Down
23 changes: 1 addition & 22 deletions vayu/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1 @@
use crate::operators::filter::FilterOperator;
use crate::operators::join::HashProbeOperator;
use crate::operators::projection::ProjectionOperator;
use crate::pipeline;
use crate::store::Store;
use arrow::array::BooleanBufferBuilder;
use core::panic;
use datafusion::arrow::array::RecordBatch;
use datafusion::datasource::physical_plan::CsvExec;
use datafusion::error::Result;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::hash_join::BuildSide;
use datafusion::physical_plan::joins::hash_join::BuildSideReadyState;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionConfig;
use std::sync::Arc;

2 changes: 1 addition & 1 deletion vayu/src/sinks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use datafusion::physical_plan::joins::hash_join;
use datafusion::physical_plan::ExecutionPlan;

use crate::store::Blob;
use crate::RecordBatch;
use ahash::RandomState;
use arrow::datatypes::Schema;
use vayu_common::store::Blob;

use datafusion::execution::memory_pool::MemoryConsumer;
use datafusion::physical_expr::PhysicalExprRef;
Expand Down
62 changes: 0 additions & 62 deletions vayu/src/store.rs

This file was deleted.

4 changes: 4 additions & 0 deletions vayuDB/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ vayu-common = { version = "0.1.0", path = "../vayu-common"}
futures = { version = "0.3.28" }
heapless = "0.8.0"
crossbeam-channel = "0.5"
crossbeam-skiplist = "0.1.3"
crossbeam-utils = "0.8.19"
leapfrog = {version = "0.3.0", features = ["stable_alloc"]}
lockfree = "0.5.1"
Loading

0 comments on commit 68a0f63

Please sign in to comment.