Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
added vayu crate
Browse files Browse the repository at this point in the history
  • Loading branch information
yashkothari42 committed Mar 6, 2024
1 parent 533c7fb commit 5aaaf75
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"rust-analyzer.linkedProjects": [
"./Cargo.toml"
]
}
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = ["datafusion-integration"] }
workspace = { members = ["datafusion-integration", "vayu"] }
[package]
name = "ee2"
version = "0.1.0"
Expand All @@ -7,7 +7,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
substrait = "0.24.1"
prost = "0.12"
prost-types = "0.12"
protoc-rust = "2.28.0"
arrow = { version = "50.0.0"}
datafusion = { version = "35.0.0"}
datafusion-proto = { version = "35.0.0"}
tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] }
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

3 changes: 0 additions & 3 deletions src/main.rs

This file was deleted.

12 changes: 12 additions & 0 deletions vayu/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "vayu"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "50.0.0"}
datafusion = { version = "35.0.0"}
tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] }

25 changes: 25 additions & 0 deletions vayu/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use core::panic;

use arrow::error::Result;
use arrow::{array::RecordBatch, util::pretty};
use pipeline::Pipeline;
pub mod operators;
pub mod pipeline;
pub fn execute(pipeline: Pipeline) -> Result<Vec<RecordBatch>> {
// pipeline
println!("source is {}", pipeline.source_operator.is_some());
println!("operators len is {}", pipeline.operators.len());
let mut data = if let Some(so) = pipeline.source_operator {
let data = so.get_data().unwrap();
println!("Pipeline source {}", so.name());
data
} else {
panic!("no source operator")
};
for x in pipeline.operators {
let mut data1 = x.execute(&data).unwrap();
println!("{}", x.name());
data = data1;
}
Ok(vec![data])
}
2 changes: 2 additions & 0 deletions vayu/src/operators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod filter;
pub mod scan;
49 changes: 49 additions & 0 deletions vayu/src/operators/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::pipeline::{IntermediateOperator, PhysicalOperator};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::physical_plan::FileStream;
use datafusion::datasource::physical_plan::{CsvConfig, CsvOpener};
use datafusion::physical_plan::{common, SendableRecordBatchStream};

use arrow::compute::filter_record_batch;
use datafusion::arrow::array::RecordBatch;
use datafusion::common::cast::as_boolean_array;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::error::Result;
use datafusion::physical_plan::{
functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr,
};
use std::sync::Arc;
use tokio::task;
pub struct FilterOperator {
predicate: Arc<dyn PhysicalExpr>,
}
impl FilterOperator {
pub fn new(predicate: Arc<dyn PhysicalExpr>) -> FilterOperator {
FilterOperator { predicate }
}
}

impl IntermediateOperator for FilterOperator {
fn execute(&self, input: &RecordBatch) -> Result<RecordBatch> {
let output = filter_record_batch(
&input,
as_boolean_array(
&self
.predicate
.evaluate(&input)
.unwrap()
.into_array(1024)
.unwrap(),
)
.unwrap(),
)
.unwrap();
Ok(output)
}
}

impl PhysicalOperator for FilterOperator {
fn name(&self) -> String {
String::from("filter")
}
}
47 changes: 47 additions & 0 deletions vayu/src/operators/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::pipeline::{PhysicalOperator, Source};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::physical_plan::FileStream;
use datafusion::datasource::physical_plan::{CsvConfig, CsvOpener};
use datafusion::physical_plan::{common, SendableRecordBatchStream};

use datafusion::arrow::array::RecordBatch;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::error::Result;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use std::str::FromStr;
use std::sync::Arc;
use tokio::task;
pub struct ScanOperator {
csvconfig: CsvConfig,
pub fileconfig: FileScanConfig,
}
impl ScanOperator {
pub fn new(csvconfig: CsvConfig, fileconfig: FileScanConfig) -> ScanOperator {
ScanOperator {
csvconfig,
fileconfig,
}
}
}

impl Source for ScanOperator {
fn get_data(&self) -> Result<RecordBatch> {
let conf = Arc::new(self.csvconfig.clone());
let opener = CsvOpener::new(conf, FileCompressionType::UNCOMPRESSED);
let stream = FileStream::new(&self.fileconfig, 0, opener, &ExecutionPlanMetricsSet::new())?;
let temp = Box::pin(stream) as SendableRecordBatchStream;
let temp_chunks = task::block_in_place(|| {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(common::collect(temp))
})
.unwrap();
Ok(temp_chunks[0].clone())
}
}

impl PhysicalOperator for ScanOperator {
fn name(&self) -> String {
String::from("scan")
}
}
47 changes: 47 additions & 0 deletions vayu/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::Schema;
use datafusion::error::Result;
use std::sync::Arc;

pub struct Pipeline {
pub source_operator: Option<Box<dyn Source>>,
pub sink_operator: Option<Box<dyn Sink>>,
pub operators: Vec<Box<dyn IntermediateOperator>>,
pub state: PipelineState,
}
pub struct PipelineState {
pub schema: Option<Arc<Schema>>,
}
impl Pipeline {
pub fn new() -> Pipeline {
Pipeline {
source_operator: None,
sink_operator: None,
operators: vec![],
state: PipelineState { schema: None },
}
}
}
pub trait PhysicalOperator {
// fn is_sink(&self) -> bool;
// fn is_source(&self) -> bool;
fn name(&self) -> String;
}

//Operators that implement Sink trait consume data
pub trait Sink: PhysicalOperator {
// Sink method is called constantly with new input, as long as new input is available
fn sink(&self, chunk: &mut RecordBatch) -> bool;
}

//Operators that implement Source trait emit data
pub trait Source: PhysicalOperator {
fn get_data(&self) -> Result<RecordBatch>;
}

//Physical operators that implement the Operator trait process data
pub trait IntermediateOperator: PhysicalOperator {
//takes an input chunk and outputs another chunk
//for example in Projection Operator we appply the expression to the input chunk and produce the output chunk
fn execute(&self, input: &RecordBatch) -> Result<RecordBatch>;
}

0 comments on commit 5aaaf75

Please sign in to comment.