From 5aaaf7585d699642d8451ca11a84b61cf2c3de9d Mon Sep 17 00:00:00 2001 From: Yash Kothari Date: Wed, 6 Mar 2024 12:30:59 -0500 Subject: [PATCH] added vayu crate --- .vscode/settings.json | 5 ++++ Cargo.toml | 10 ++++---- src/lib.rs | 1 + src/main.rs | 3 --- vayu/Cargo.toml | 12 +++++++++ vayu/src/lib.rs | 25 ++++++++++++++++++ vayu/src/operators.rs | 2 ++ vayu/src/operators/filter.rs | 49 ++++++++++++++++++++++++++++++++++++ vayu/src/operators/scan.rs | 47 ++++++++++++++++++++++++++++++++++ vayu/src/pipeline.rs | 47 ++++++++++++++++++++++++++++++++++ 10 files changed, 193 insertions(+), 8 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 src/lib.rs delete mode 100644 src/main.rs create mode 100644 vayu/Cargo.toml create mode 100644 vayu/src/lib.rs create mode 100644 vayu/src/operators.rs create mode 100644 vayu/src/operators/filter.rs create mode 100644 vayu/src/operators/scan.rs create mode 100644 vayu/src/pipeline.rs diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..352a626 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "rust-analyzer.linkedProjects": [ + "./Cargo.toml" + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 6dc1b89..2b2d623 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,4 @@ -workspace = { members = ["datafusion-integration"] } +workspace = { members = ["datafusion-integration", "vayu"] } [package] name = "ee2" version = "0.1.0" @@ -7,7 +7,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -substrait = "0.24.1" -prost = "0.12" -prost-types = "0.12" -protoc-rust = "2.28.0" +arrow = { version = "50.0.0"} +datafusion = { version = "35.0.0"} +datafusion-proto = { version = "35.0.0"} +tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1 @@ + diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/vayu/Cargo.toml b/vayu/Cargo.toml new file mode 100644 index 0000000..7697667 --- /dev/null +++ b/vayu/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "vayu" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arrow = { version = "50.0.0"} +datafusion = { version = "35.0.0"} +tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } + diff --git a/vayu/src/lib.rs b/vayu/src/lib.rs new file mode 100644 index 0000000..10bdaaf --- /dev/null +++ b/vayu/src/lib.rs @@ -0,0 +1,25 @@ +use core::panic; + +use arrow::error::Result; +use arrow::{array::RecordBatch, util::pretty}; +use pipeline::Pipeline; +pub mod operators; +pub mod pipeline; +pub fn execute(pipeline: Pipeline) -> Result> { + // pipeline + println!("source is {}", pipeline.source_operator.is_some()); + println!("operators len is {}", pipeline.operators.len()); + let mut data = if let Some(so) = pipeline.source_operator { + let data = so.get_data().unwrap(); + println!("Pipeline source {}", so.name()); + data + } else { + panic!("no source operator") + }; + for x in pipeline.operators { + let mut data1 = x.execute(&data).unwrap(); + println!("{}", x.name()); + data = data1; + } + Ok(vec![data]) +} diff --git a/vayu/src/operators.rs b/vayu/src/operators.rs new file mode 100644 index 0000000..04538e7 --- /dev/null +++ b/vayu/src/operators.rs @@ -0,0 +1,2 @@ +pub mod filter; +pub mod scan; diff --git a/vayu/src/operators/filter.rs b/vayu/src/operators/filter.rs new file mode 100644 index 0000000..83a7253 --- /dev/null +++ b/vayu/src/operators/filter.rs @@ -0,0 +1,49 @@ +use crate::pipeline::{IntermediateOperator, PhysicalOperator}; +use datafusion::datasource::file_format::file_compression_type::FileCompressionType; +use datafusion::datasource::physical_plan::FileStream; +use datafusion::datasource::physical_plan::{CsvConfig, CsvOpener}; +use datafusion::physical_plan::{common, SendableRecordBatchStream}; + +use arrow::compute::filter_record_batch; +use datafusion::arrow::array::RecordBatch; +use datafusion::common::cast::as_boolean_array; +use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::error::Result; +use datafusion::physical_plan::{ + functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr, +}; +use std::sync::Arc; +use tokio::task; +pub struct FilterOperator { + predicate: Arc, +} +impl FilterOperator { + pub fn new(predicate: Arc) -> FilterOperator { + FilterOperator { predicate } + } +} + +impl IntermediateOperator for FilterOperator { + fn execute(&self, input: &RecordBatch) -> Result { + let output = filter_record_batch( + &input, + as_boolean_array( + &self + .predicate + .evaluate(&input) + .unwrap() + .into_array(1024) + .unwrap(), + ) + .unwrap(), + ) + .unwrap(); + Ok(output) + } +} + +impl PhysicalOperator for FilterOperator { + fn name(&self) -> String { + String::from("filter") + } +} diff --git a/vayu/src/operators/scan.rs b/vayu/src/operators/scan.rs new file mode 100644 index 0000000..29d24a7 --- /dev/null +++ b/vayu/src/operators/scan.rs @@ -0,0 +1,47 @@ +use crate::pipeline::{PhysicalOperator, Source}; +use datafusion::datasource::file_format::file_compression_type::FileCompressionType; +use datafusion::datasource::physical_plan::FileStream; +use datafusion::datasource::physical_plan::{CsvConfig, CsvOpener}; +use datafusion::physical_plan::{common, SendableRecordBatchStream}; + +use datafusion::arrow::array::RecordBatch; +use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::error::Result; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use std::str::FromStr; +use std::sync::Arc; +use tokio::task; +pub struct ScanOperator { + csvconfig: CsvConfig, + pub fileconfig: FileScanConfig, +} +impl ScanOperator { + pub fn new(csvconfig: CsvConfig, fileconfig: FileScanConfig) -> ScanOperator { + ScanOperator { + csvconfig, + fileconfig, + } + } +} + +impl Source for ScanOperator { + fn get_data(&self) -> Result { + let conf = Arc::new(self.csvconfig.clone()); + let opener = CsvOpener::new(conf, FileCompressionType::UNCOMPRESSED); + let stream = FileStream::new(&self.fileconfig, 0, opener, &ExecutionPlanMetricsSet::new())?; + let temp = Box::pin(stream) as SendableRecordBatchStream; + let temp_chunks = task::block_in_place(|| { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(common::collect(temp)) + }) + .unwrap(); + Ok(temp_chunks[0].clone()) + } +} + +impl PhysicalOperator for ScanOperator { + fn name(&self) -> String { + String::from("scan") + } +} diff --git a/vayu/src/pipeline.rs b/vayu/src/pipeline.rs new file mode 100644 index 0000000..a6442e5 --- /dev/null +++ b/vayu/src/pipeline.rs @@ -0,0 +1,47 @@ +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::datatypes::Schema; +use datafusion::error::Result; +use std::sync::Arc; + +pub struct Pipeline { + pub source_operator: Option>, + pub sink_operator: Option>, + pub operators: Vec>, + pub state: PipelineState, +} +pub struct PipelineState { + pub schema: Option>, +} +impl Pipeline { + pub fn new() -> Pipeline { + Pipeline { + source_operator: None, + sink_operator: None, + operators: vec![], + state: PipelineState { schema: None }, + } + } +} +pub trait PhysicalOperator { + // fn is_sink(&self) -> bool; + // fn is_source(&self) -> bool; + fn name(&self) -> String; +} + +//Operators that implement Sink trait consume data +pub trait Sink: PhysicalOperator { + // Sink method is called constantly with new input, as long as new input is available + fn sink(&self, chunk: &mut RecordBatch) -> bool; +} + +//Operators that implement Source trait emit data +pub trait Source: PhysicalOperator { + fn get_data(&self) -> Result; +} + +//Physical operators that implement the Operator trait process data +pub trait IntermediateOperator: PhysicalOperator { + //takes an input chunk and outputs another chunk + //for example in Projection Operator we appply the expression to the input chunk and produce the output chunk + fn execute(&self, input: &RecordBatch) -> Result; +}