diff --git a/vayu-common/Cargo.toml b/vayu-common/Cargo.toml index b4e4578..488ea55 100644 --- a/vayu-common/Cargo.toml +++ b/vayu-common/Cargo.toml @@ -8,3 +8,5 @@ edition = "2021" [dependencies] datafusion = { version = "36.0.0", path = "../arrow-datafusion/datafusion/core"} arrow = {} +crossbeam-utils = "0.8.19" +crossbeam-skiplist = "0.1.3" diff --git a/vayu-common/src/lib.rs b/vayu-common/src/lib.rs index 9fcd642..25aba60 100644 --- a/vayu-common/src/lib.rs +++ b/vayu-common/src/lib.rs @@ -2,6 +2,8 @@ use arrow::record_batch::RecordBatch; use datafusion::common::Result; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use std::sync::Arc; +pub mod store; + pub trait PhysicalOperator { fn name(&self) -> String; } diff --git a/vayu-common/src/store.rs b/vayu-common/src/store.rs new file mode 100644 index 0000000..f65917e --- /dev/null +++ b/vayu-common/src/store.rs @@ -0,0 +1,96 @@ +use arrow::array::RecordBatch; +use crossbeam_skiplist::SkipMap; +use crossbeam_utils::thread::scope; +use datafusion::physical_plan::joins::hash_join::JoinLeftData; + +use core::panic; +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; +pub enum Blob { + // RecordBatchBlob(Vec), + HashMapBlob(JoinLeftData), +} + +impl Blob { + pub fn get_map(self) -> JoinLeftData { + match self { + Blob::HashMapBlob(m) => m, + _ => panic!("error"), + } + } + // pub fn get_records(self) -> Vec { + // match self { + // Blob::RecordBatchBlob(records) => records, + // _ => panic!("error"), + // } + // } + // pub fn append_records(&mut self, batches: Vec) { + // match self { + // Blob::RecordBatchBlob(records) => { + // // TODO: check if schema is same + // records.extend(batches) + // } + // _ => panic!("error"), + // } + // } +} + +// right now this is typedef of HashMap, +// but we may need something else in near future + +pub struct Store { + store: HashMap, +} +impl Store { + pub fn new() -> Store { + Store { + store: HashMap::new(), + } + } + pub fn insert(&mut self, key: i32, value: Blob) { + self.store.insert(key, value); + } + // pub fn append(&mut self, key: i32, value: Vec) { + // let blob = self.remove(key); + // let mut blob = match blob { + // Some(r) => r, + // None => Blob::RecordBatchBlob(Vec::new()), + // }; + // blob.append_records(value); + // self.store.insert(key, blob); + // } + pub fn remove(&mut self, key: i32) -> Option { + self.store.remove(&key) + // let x = self.store.remove(&key).unwrap().value(); + // Some(x) + } +} +pub struct Store1 { + store: SkipMap, +} +impl Store1 { + pub fn new() -> Self { + Store1 { + store: SkipMap::new(), + } + } + pub fn insert(&mut self, key: i32, value: i32) { + self.store.insert(key, value); + } + // pub fn append(&mut self, key: i32, value: Vec) { + // let blob = self.remove(key); + // let mut blob = match blob { + // Some(r) => r, + // None => Blob::RecordBatchBlob(Vec::new()), + // }; + // blob.append_records(value); + // self.store.insert(key, blob); + // } + // pub fn remove(&mut self, key: i32) -> &Option> { + // let value = self.store.remove(&key).unwrap().value(); + // Some(value.un) + // // let x = self.store.remove(&key).unwrap().value(); + // // Some(x) + // } +} diff --git a/vayu/Cargo.toml b/vayu/Cargo.toml index 7b3f8e8..28b9bc2 100644 --- a/vayu/Cargo.toml +++ b/vayu/Cargo.toml @@ -12,3 +12,7 @@ futures = { version = "0.3.28" } datafusion = { version = "36.0.0", path = "../arrow-datafusion/datafusion/core"} vayu-common = { version = "0.1.0", path = "../vayu-common"} ahash = "0.8.11" +crossbeam-skiplist = "0.1.3" +crossbeam-utils = "0.8.19" +leapfrog = {version = "0.3.0", features = ["stable_alloc"]} + diff --git a/vayu/src/lib.rs b/vayu/src/lib.rs index 6b2816a..6eae71c 100644 --- a/vayu/src/lib.rs +++ b/vayu/src/lib.rs @@ -4,18 +4,24 @@ use vayu_common::DatafusionPipelineWithData; use vayu_common::VayuPipeline; pub mod operators; pub mod pipeline; +use std::sync::{Arc, Mutex}; + pub mod sinks; -pub mod store; -use crate::store::Store; +use vayu_common::store::Store; pub mod df2vayu; pub struct VayuExecutionEngine { + // this is per node store pub store: Store, + // this is global store + pub global_store: Arc>, + // Note: only one of them will survive lets see which } impl VayuExecutionEngine { - pub fn new() -> VayuExecutionEngine { + pub fn new(global_store: Arc>) -> VayuExecutionEngine { VayuExecutionEngine { store: Store::new(), + global_store, } } pub fn sink(&mut self, sink: vayu_common::SchedulerSinkType, result: Vec) { @@ -33,16 +39,25 @@ impl VayuExecutionEngine { // } vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node) => { let mut sink = sinks::HashMapSink::new(uuid, join_node); - let map = sink.build_map(result); + let hashmap = sink.build_map(result); println!("BuildAndStoreHashMap storing in uuid {uuid}"); - self.store.insert(uuid, map.unwrap()); + + // old store + // self.store.insert(uuid, hashmap.unwrap()); + // new store + let mut map = self.global_store.lock().unwrap(); + map.insert(uuid, hashmap.unwrap()); } }; } pub fn execute(&mut self, pipeline: DatafusionPipelineWithData) { let data = pipeline.data; let sink = pipeline.pipeline.sink; - let mut pipeline: VayuPipeline = df2vayu::df2vayu(pipeline.pipeline.plan, &mut self.store); + + let mut store = self.global_store.lock().unwrap(); + let mut pipeline: VayuPipeline = df2vayu::df2vayu(pipeline.pipeline.plan, &mut store); + drop(store); + pipeline.sink = sink; self.execute_internal(pipeline, data); diff --git a/vayu/src/pipeline.rs b/vayu/src/pipeline.rs index 5777071..8b13789 100644 --- a/vayu/src/pipeline.rs +++ b/vayu/src/pipeline.rs @@ -1,22 +1 @@ -use crate::operators::filter::FilterOperator; -use crate::operators::join::HashProbeOperator; -use crate::operators::projection::ProjectionOperator; -use crate::pipeline; -use crate::store::Store; -use arrow::array::BooleanBufferBuilder; -use core::panic; -use datafusion::arrow::array::RecordBatch; -use datafusion::datasource::physical_plan::CsvExec; -use datafusion::error::Result; -use datafusion::execution::context::SessionContext; -use datafusion::execution::SendableRecordBatchStream; -use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; -use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::joins::hash_join::BuildSide; -use datafusion::physical_plan::joins::hash_join::BuildSideReadyState; -use datafusion::physical_plan::joins::HashJoinExec; -use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::SessionConfig; -use std::sync::Arc; + diff --git a/vayu/src/sinks.rs b/vayu/src/sinks.rs index 731288f..0965460 100644 --- a/vayu/src/sinks.rs +++ b/vayu/src/sinks.rs @@ -1,10 +1,10 @@ use datafusion::physical_plan::joins::hash_join; use datafusion::physical_plan::ExecutionPlan; -use crate::store::Blob; use crate::RecordBatch; use ahash::RandomState; use arrow::datatypes::Schema; +use vayu_common::store::Blob; use datafusion::execution::memory_pool::MemoryConsumer; use datafusion::physical_expr::PhysicalExprRef; diff --git a/vayu/src/store.rs b/vayu/src/store.rs deleted file mode 100644 index 6cd8e08..0000000 --- a/vayu/src/store.rs +++ /dev/null @@ -1,62 +0,0 @@ -use arrow::array::RecordBatch; -use datafusion::physical_plan::joins::hash_join::JoinLeftData; - -use core::panic; -use std::collections::HashMap; -pub enum Blob { - RecordBatchBlob(Vec), - HashMapBlob(JoinLeftData), -} - -impl Blob { - pub fn get_map(self) -> JoinLeftData { - match self { - Blob::HashMapBlob(m) => m, - _ => panic!("error"), - } - } - pub fn get_records(self) -> Vec { - match self { - Blob::RecordBatchBlob(records) => records, - _ => panic!("error"), - } - } - pub fn append_records(&mut self, batches: Vec) { - match self { - Blob::RecordBatchBlob(records) => { - // TODO: check if schema is same - records.extend(batches) - } - _ => panic!("error"), - } - } -} - -// right now this is typedef of HashMap, -// but we may need something else in near future - -pub struct Store { - store: HashMap, -} -impl Store { - pub fn new() -> Store { - Store { - store: HashMap::new(), - } - } - pub fn insert(&mut self, key: i32, value: Blob) { - self.store.insert(key, value); - } - pub fn append(&mut self, key: i32, value: Vec) { - let blob = self.remove(key); - let mut blob = match blob { - Some(r) => r, - None => Blob::RecordBatchBlob(Vec::new()), - }; - blob.append_records(value); - self.store.insert(key, blob); - } - pub fn remove(&mut self, key: i32) -> Option { - self.store.remove(&key) - } -} diff --git a/vayuDB/Cargo.toml b/vayuDB/Cargo.toml index 5df974d..e09f041 100644 --- a/vayuDB/Cargo.toml +++ b/vayuDB/Cargo.toml @@ -12,3 +12,7 @@ vayu-common = { version = "0.1.0", path = "../vayu-common"} futures = { version = "0.3.28" } heapless = "0.8.0" crossbeam-channel = "0.5" +crossbeam-skiplist = "0.1.3" +crossbeam-utils = "0.8.19" +leapfrog = {version = "0.3.0", features = ["stable_alloc"]} +lockfree = "0.5.1" diff --git a/vayuDB/src/main.rs b/vayuDB/src/main.rs index d85bc21..9eef091 100644 --- a/vayuDB/src/main.rs +++ b/vayuDB/src/main.rs @@ -1,4 +1,5 @@ use crossbeam_channel::{bounded, Receiver, Sender}; +use futures::stream::Skip; use std::collections::HashMap; use std::task::Poll; use std::thread; @@ -6,58 +7,93 @@ use vayu_common::{DatafusionPipeline, DatafusionPipelineWithData}; mod dummy_tasks; mod io_service; mod scheduler; +use crossbeam_skiplist::SkipMap; +use crossbeam_utils::thread::scope; +use lockfree; use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; +use vayu_common; +use vayu_common::store::Store; fn round_robin(worker_id: usize, num_threads: usize) -> usize { (worker_id + 1) % num_threads } -fn start_worker(receiver: Receiver, sender: Sender) { +fn start_worker( + receiver: Receiver, + sender: Sender, + global_store: Arc>, +) { // TODO: set cpu affinity - let mut executor = vayu::VayuExecutionEngine::new(); + let mut executor = vayu::VayuExecutionEngine::new(global_store); // Receive structs sent over the channel - sender.send(true).unwrap(); + sender.send(0).unwrap(); while let Ok(pipeline) = receiver.recv() { println!("got a pipeline for the thread, executing ..."); executor.execute(pipeline); - sender.send(true).unwrap(); + sender.send(0).unwrap(); } } + fn main() { - let (informer_sender, informer_receiver) = bounded(0); - let num_threads = 1; + // can use these crates if they have a lockfree concurrent hashmap + // let global_store: SkipMap = SkipMap::new(); + // let global_store: lockfree::map::Map = lockfree::map::Map::default(); + // let global_store: leapfrog::LeapMap = leapfrog::LeapMap::new(); + + // number of threads to run parallely + let num_threads = 2; + // global store to store intermediate hash maps ( and other stuff ) + // TODO: change this to use spinlock and eventually something better + let global_store: Arc> = Arc::new(Mutex::new(Store::new())); + + // this MPSC queue would be used by workers to inform - + // 1. pipeline has finished + // 2. request for new work + let (informer_sender, informer_receiver): (Sender, Receiver) = bounded(0); + + // vector to store main_thread->worker channels let mut senders: Vec> = Vec::new(); + for thread_num in 0..num_threads { - // create a bounded channel to send data from main thread to worker thread - // TODO: it is mpmc right, use some optimized spsc lockfree queue - let (sender, receiver) = bounded(100); - println!("spawning a new thread {thread_num}"); - // store the sender for future use + // channel to send pipeline and data from main thread the to worker thread + // TODO: it is mpmc right now, use some optimized spsc lockfree queue + let (sender, receiver) = bounded(1); senders.push(sender); + + println!("spawning a new thread {thread_num}"); + + let global_store_clone = Arc::clone(&global_store); let informer_sender_clone = informer_sender.clone(); - // start worker thread which will keep looking for new entries in the bounded channel + + // start worker thread which will keep looking for new entries in the channel thread::spawn(move || { - start_worker(receiver, informer_sender_clone); + start_worker(receiver, informer_sender_clone, global_store_clone); }); } - println!("number of workers {}", senders.len()); + + println!("total number of workers {}", senders.len()); + let mut scheduler = scheduler::Scheduler::new(); let mut io_service = io_service::IOService::new(); // TODO: create task_queue - buffer tasks let mut worker_id = 0; - let mut request_pipeline_map: HashMap = - HashMap::::new(); + let mut request_pipeline_map: HashMap = HashMap::new(); + // right now a pipeline would be assigned to a worker only when it is free + // but we will poll some extra pipelines from the scheduler and send it to the io service + // so that we can start working on it once any worker is free let mut non_assigned_pipelines = 0; loop { // poll scheduler for a new task if non_assigned_pipelines < 10 { let pipeline = scheduler.get_pipeline(); - if let Poll::Ready(pipeline) = pipeline { non_assigned_pipelines += 1; // TODO: add support for multiple dependent pipeline println!("got a pipeline from scheduler"); assert!(pipeline.sink.is_some()); + // submit the source request to io service let request_num = io_service.submit_request(pipeline.source); println!("sent the request to the io_service"); @@ -76,16 +112,18 @@ fn main() { // poll io_service for a response let response = io_service.poll_response(); if let Poll::Ready((request_num, data)) = response { - // get the pipeline from the local map println!("got a response from the io_service"); + // TODO: handle when a source gives multiple record batches + // get the pipeline from the local map let pipeline = request_pipeline_map.remove(&request_num).unwrap(); + // send over channel let msg = DatafusionPipelineWithData { pipeline, data }; senders[worker_id].send(msg).expect("Failed to send struct"); println!("sent the pipeline and the data to the worker"); + non_assigned_pipelines -= 1; - // ASSUMPTION: we will get data in one record batch for one pipeline // assign the next pipeline to some other worker worker_id = round_robin(worker_id, num_threads); }