From 71eaf97e6a4f68e42315c3a6af2966fdcfa62219 Mon Sep 17 00:00:00 2001 From: Yash Kothari Date: Wed, 27 Mar 2024 14:08:14 -0400 Subject: [PATCH] push based Vayu Execution Engine with multiple pipelines support (#4) --- .DS_Store | Bin 0 -> 6148 bytes .gitmodules | 3 + .vscode/settings.json | 5 + Cargo.toml | 10 +- arrow-datafusion | 1 + datafusion-integration/.DS_Store | Bin 0 -> 6148 bytes datafusion-integration/Cargo.toml | 16 + datafusion-integration/flamegraph.svg | 491 ++++++++++++++++++ datafusion-integration/src/main.rs | 131 +++++ datafusion-integration/src/util.rs | 61 +++ datafusion-integration/testing/LICENSE.txt | 202 +++++++ datafusion-integration/testing/README.md | 27 + .../testing/data/csv/README.md | 5 + .../testing/data/csv/aggregate_test_100.csv | 101 ++++ .../data/csv/aggregate_test_100.csv.gz | Bin 0 -> 9117 bytes .../testing/data/csv/join_test_A.csv | 3 + .../testing/data/csv/join_test_B.csv | 3 + lighting/Cargo.toml | 8 + lighting/src/main.rs | 14 + src/lib.rs | 1 + src/main.rs | 3 - vayu/Cargo.toml | 14 + vayu/README.md | 85 +++ vayu/src/lib.rs | 79 +++ vayu/src/operators/filter.rs | 27 + vayu/src/operators/join.rs | 54 ++ vayu/src/operators/mod.rs | 4 + vayu/src/operators/projection.rs | 28 + vayu/src/operators/scan.rs | 36 ++ vayu/src/pipeline.rs | 114 ++++ vayu/src/pipeline_executor.rs | 54 ++ vayu/src/sinks.rs | 56 ++ vayu/src/store.rs | 62 +++ 33 files changed, 1691 insertions(+), 7 deletions(-) create mode 100644 .DS_Store create mode 100644 .gitmodules create mode 100644 .vscode/settings.json create mode 160000 arrow-datafusion create mode 100644 datafusion-integration/.DS_Store create mode 100644 datafusion-integration/Cargo.toml create mode 100644 datafusion-integration/flamegraph.svg create mode 100644 datafusion-integration/src/main.rs create mode 100644 datafusion-integration/src/util.rs create mode 100644 datafusion-integration/testing/LICENSE.txt create mode 100644 datafusion-integration/testing/README.md create mode 100644 datafusion-integration/testing/data/csv/README.md create mode 100644 datafusion-integration/testing/data/csv/aggregate_test_100.csv create mode 100644 datafusion-integration/testing/data/csv/aggregate_test_100.csv.gz create mode 100644 datafusion-integration/testing/data/csv/join_test_A.csv create mode 100644 datafusion-integration/testing/data/csv/join_test_B.csv create mode 100644 lighting/Cargo.toml create mode 100644 lighting/src/main.rs create mode 100644 src/lib.rs delete mode 100644 src/main.rs create mode 100644 vayu/Cargo.toml create mode 100644 vayu/README.md create mode 100644 vayu/src/lib.rs create mode 100644 vayu/src/operators/filter.rs create mode 100644 vayu/src/operators/join.rs create mode 100644 vayu/src/operators/mod.rs create mode 100644 vayu/src/operators/projection.rs create mode 100644 vayu/src/operators/scan.rs create mode 100644 vayu/src/pipeline.rs create mode 100644 vayu/src/pipeline_executor.rs create mode 100644 vayu/src/sinks.rs create mode 100644 vayu/src/store.rs diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..d8f70d2b49c76fc98cdc7e360daee935d020c79b GIT binary patch literal 6148 zcmeHKO>fgM7=GP`HQhk!0njc;k+_ah2AYPrbX|Lx1RNFw2SBB1!ibi|BS}|HRi&Qc zKkyg0@=N$HoZxvK4<+e1ph*Z-ekJ?y$NSjz<2QCqM4~tG9}u;O$VOtc){$Le+|Oyr zYP#h*P^d8^wJ9P=V>({)HVsArqrm^B0Drsdv`-^S=m=l#@9*~@l4CED9{Txk*h6an zREAQOlYx0cQHArh!nsX8=E;!W(}YqA!HVe~Fh*;D`4iQz+RI~!J69RYo2I9FE#JJ; zC`_|X=ewxYt!rxyyJ2tGZ`}`a>Sm*ClJ-W)Yp%VJGV(6UBkyGxOoy$T`!dc(VH_li z5c&aJ-n|}s_JEtM`FThT6jd%HyT~tx@r4U6;I>bzS)Td{_He$#x&qy3Y zMMfQQlhC;ArJ|$gwWgwsBRZh0;>#;8`dV~`-+NAzirfLWMULo!iJS_|sX{+7L{3M$qy4%X z7dhs168iBW^vpuPP=uNt<2%ZoL^ns18wHF4Wd-VawZ-@U;Lq>>GRf480!D$AQb1UJ zx8K8(^xe9&IKFEgq<2Vc?6=5KQIP5DSQYpx-b7M{K8FjyuEs@1!~d*2QK literal 0 HcmV?d00001 diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..dae32e4 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "arrow-datafusion"] + path = arrow-datafusion + url = git@github.com:yashkothari42/arrow-datafusion.git diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..352a626 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "rust-analyzer.linkedProjects": [ + "./Cargo.toml" + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 574b3c5..3d2ec1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,7 @@ +[workspace] +members = ["datafusion-integration", "lighting", "vayu"] +exclude = ["arrow-datafusion"] + [package] name = "ee2" version = "0.1.0" @@ -6,7 +10,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -substrait = "0.24.1" -prost = "0.12" -prost-types = "0.12" -protoc-rust = "2.28.0" +arrow = { version = "50.0.0"} +tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } diff --git a/arrow-datafusion b/arrow-datafusion new file mode 160000 index 0000000..4fcb0b8 --- /dev/null +++ b/arrow-datafusion @@ -0,0 +1 @@ +Subproject commit 4fcb0b8f77e0f8190798a77338fb5f1f95ee22b9 diff --git a/datafusion-integration/.DS_Store b/datafusion-integration/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..667a7bfc359c6d0561f8436d28412b5b29c57de8 GIT binary patch literal 6148 zcmeHKK~LK-6n<{Ynt~?vut{7dMdFH}8`Y$0m#nbEt^~2eKqaA77cIo7N$H`gl=Jpu zcHJ-G@38OLE>c_ISPk@^^!z>B-;0yamK_t3>W=&GiJC;@!5J$J6b~5pv#(giwQK;D zd4{GvDoE26r7LV{_#YMEx!a^eifBYb%-sF_eNFXcoaz|;yf{9?_wI}#BC1l3jLRj~ zpCwiut6>0UK?!9vTx6?vPqX7me(m1iZ+x95dC_X!s!FxC@v83Cy)ExN{Glgd5f$US z8;!nm?L_M|p2tyqnhYkr=9@#E6;YB6MkXQY4?sElmSlZB>FROT&rNP)vt57M`Ordq@NqV4cyD+1KOOgerdOH%#ZqALWAz(*?Qjmi5iDtN6OOV} zXG8SNv*v6;Q9u+B1=gj2+kd>Rbqy%vhytR(vr&Ne2OrKDI4muitpkO*0sw1hHinpg z7dgjw7&t5~q6emID$u4Xcg0XP9qqp31rAG#Hl38ad?+`vayJxZM#uAgLnjqjlu{HB z1r`;k+G>-}|1Xc<{}+>_CJKlGFG>MbI|`4w_#}6>u6!JywLbg;&c?jb;(ZDNa}Flame Graph Reset ZoomSearch datafusion_integration`tokio::runtime::builder::Builder::build (1 samples, 6.25%)datafusi..datafusion_integration`tokio::runtime::driver::Driver::new (1 samples, 6.25%)datafusi..libsystem_platform.dylib`_platform_memset (1 samples, 6.25%)libsyste..datafusion_integration`<futures_util::future::either::Either<A,B> as futures_core::stream::Stream>::poll_next (1 samples, 6.25%)datafusi..datafusion_integration`object_store::delimited::LineDelimiter::push (1 samples, 6.25%)datafusi..datafusion_integration`regex_automata::util::pool::inner::Pool<T,F>::get_slow (1 samples, 6.25%)datafusi..datafusion_integration`<regex_automata::meta::strategy::Core as regex_automata::meta::strategy::Strategy>::create_cache (1 samples, 6.25%)datafusi..datafusion_integration`regex_automata::hybrid::dfa::Cache::new (1 samples, 6.25%)datafusi..libsystem_platform.dylib`__bzero (1 samples, 6.25%)libsyste..datafusion_integration`regex_automata::dfa::onepass::Builder::build_from_nfa (1 samples, 6.25%)datafusi..datafusion_integration`<datafusion::datasource::file_format::csv::CsvFormat as datafusion::datasource::file_format::FileFormat>::infer_schema::_{{closure}} (4 samples, 25.00%)datafusion_integration`<datafusion::data..datafusion_integration`arrow_csv::reader::Format::infer_schema (3 samples, 18.75%)datafusion_integration`arrow_..datafusion_integration`arrow_csv::reader::InferredDataType::update (3 samples, 18.75%)datafusion_integration`arrow_..datafusion_integration`std::sys_common::once::queue::Once::call (2 samples, 12.50%)datafusion_integrat..datafusion_integration`std::sync::once::Once::call_once::_{{closure}} (2 samples, 12.50%)datafusion_integrat..datafusion_integration`regex::regexset::string::RegexSet::new (2 samples, 12.50%)datafusion_integrat..datafusion_integration`regex::builders::Builder::build_many_string (2 samples, 12.50%)datafusion_integrat..datafusion_integration`regex_automata::meta::regex::Builder::build_many (2 samples, 12.50%)datafusion_integrat..datafusion_integration`regex_automata::meta::strategy::new (2 samples, 12.50%)datafusion_integrat..datafusion_integration`regex_automata::nfa::thompson::compiler::Compiler::compile (1 samples, 6.25%)datafusi..datafusion_integration`<core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::next (1 samples, 6.25%)datafusi..datafusion_integration`regex_automata::nfa::thompson::compiler::Compiler::c (1 samples, 6.25%)datafusi..datafusion_integration`regex_automata::nfa::thompson::compiler::Compiler::c (1 samples, 6.25%)datafusi..datafusion_integration`regex_automata::nfa::thompson::compiler::Compiler::c_concat (1 samples, 6.25%)datafusi..datafusion_integration`regex_automata::nfa::thompson::compiler::Compiler::c (1 samples, 6.25%)datafusi..datafusion_integration`regex_syntax::utf8::Utf8Sequences::new (1 samples, 6.25%)datafusi..datafusion_integration`alloc::raw_vec::RawVec<T,A>::reserve_for_push (1 samples, 6.25%)datafusi..libsystem_malloc.dylib`_malloc_zone_malloc (1 samples, 6.25%)libsyste..datafusion_integration`<datafusion::physical_optimizer::enforce_distribution::EnforceDistribution as datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule>::optimize (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_common::tree_node::TreeNode::transform_up (1 samples, 6.25%)datafusi..datafusion_integration`core::iter::adapters::try_process (1 samples, 6.25%)datafusi..datafusion_integration`alloc::vec::in_place_collect::_<impl alloc::vec::spec_from_iter::SpecFromIter<T,I> for alloc::vec::Vec<T>>::from_iter (1 samples, 6.25%)datafusi..datafusion_integration`<core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::try_fold (1 samples, 6.25%)datafusi..datafusion_integration`<datafusion_physical_plan::filter::FilterExec as datafusion_physical_plan::ExecutionPlan>::statistics (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_physical_expr::analysis::analyze (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_physical_expr::utils::build_dag (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_common::tree_node::TreeNode::rewrite (1 samples, 6.25%)datafusi..datafusion_integration`core::iter::adapters::try_process (1 samples, 6.25%)datafusi..datafusion_integration`alloc::vec::in_place_collect::_<impl alloc::vec::spec_from_iter::SpecFromIter<T,I> for alloc::vec::Vec<T>>::from_iter (1 samples, 6.25%)datafusi..datafusion_integration`<core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::try_fold (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_common::tree_node::TreeNode::rewrite (1 samples, 6.25%)datafusi..datafusion_integration`<datafusion_physical_expr::utils::PhysicalExprDAEGBuilder<T,F> as datafusion_common::tree_node::TreeNodeRewriter>::mutate (1 samples, 6.25%)datafusi..datafusion_integration`<datafusion_physical_expr::expressions::literal::Literal as core::cmp::PartialEq<dyn core::any::Any>>::eq (1 samples, 6.25%)datafusi..datafusion_integration`<datafusion::execution::context::DefaultQueryPlanner as datafusion::execution::context::QueryPlanner>::create_physical_plan::_{{closure}} (2 samples, 12.50%)datafusion_integrat..datafusion_integration`<datafusion::physical_planner::DefaultPhysicalPlanner as datafusion::physical_planner::PhysicalPlanner>::create_physical_plan::_{{closure}} (2 samples, 12.50%)datafusion_integrat..datafusion_integration`datafusion::physical_planner::DefaultPhysicalPlanner::create_initial_plan::_{{closure}} (1 samples, 6.25%)datafusi..datafusion_integration`datafusion::physical_planner::DefaultPhysicalPlanner::create_initial_plan::_{{closure}} (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_physical_expr::planner::create_physical_expr (1 samples, 6.25%)datafusi..datafusion_integration`datafusion::datasource::file_format::file_compression_type::FileCompressionType::convert_stream (1 samples, 6.25%)datafusi..datafusion_integration`datafusion::execution::context::SessionContext::new (1 samples, 6.25%)datafusi..datafusion_integration`datafusion::execution::context::SessionContext::new_with_config (1 samples, 6.25%)datafusi..datafusion_integration`datafusion::execution::context::SessionState::new_with_config_rt (1 samples, 6.25%)datafusi..datafusion_integration`datafusion::execution::context::SessionState::new_with_config_rt_and_catalog_list (1 samples, 6.25%)datafusi..datafusion_integration`uuid::fmt::_<impl core::fmt::Display for uuid::Uuid>::fmt (1 samples, 6.25%)datafusi..datafusion_integration`<alloc::string::String as core::fmt::Write>::write_str (1 samples, 6.25%)datafusi..datafusion_integration`alloc::raw_vec::finish_grow (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_common::tree_node::TreeNode::transform_up (1 samples, 6.25%)datafusi..datafusion_integration`core::iter::adapters::try_process (1 samples, 6.25%)datafusi..datafusion_integration`<alloc::vec::Vec<T> as alloc::vec::spec_from_iter::SpecFromIter<T,I>>::from_iter (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_common::tree_node::TreeNode::transform_up (1 samples, 6.25%)datafusi..datafusion_integration`core::iter::adapters::try_process (1 samples, 6.25%)datafusi..datafusion_integration`<alloc::vec::Vec<T> as alloc::vec::spec_from_iter::SpecFromIter<T,I>>::from_iter (1 samples, 6.25%)datafusi..datafusion_integration`<alloc::vec::Vec<T,A> as core::clone::Clone>::clone (1 samples, 6.25%)datafusi..datafusion_integration`datafusion::execution::context::SessionState::optimize (2 samples, 12.50%)datafusion_integrat..datafusion_integration`datafusion_optimizer::optimizer::Optimizer::optimize (1 samples, 6.25%)datafusi..datafusion_integration`core::ptr::drop_in_place<datafusion_expr::logical_plan::plan::LogicalPlan> (1 samples, 6.25%)datafusi..datafusion_integration`core::ptr::drop_in_place<datafusion_common::table_reference::TableReference> (1 samples, 6.25%)datafusi..datafusion_integration`datafusion::execution::context::SessionState::sql_to_statement (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::parser::DFParser::parse_sql_with_dialect (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::parser::DFParser::parse_statement (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_statement (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_query (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_query_body (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_select (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_comma_separated (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_select_item (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_wildcard_expr (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_expr (1 samples, 6.25%)datafusi..datafusion_integration`sqlparser::parser::Parser::parse_subexpr (1 samples, 6.25%)datafusi..datafusion_integration`core::ptr::drop_in_place<alloc::vec::Vec<alloc::string::String>> (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_integration::execution::make_pipeline (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_integration::execution::make_pipeline (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_integration::execution::make_pipeline (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_integration::execution::make_pipeline (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_proto::physical_plan::from_proto::_<impl core::convert::TryFrom<&datafusion_proto::generated::datafusion::Statistics> for datafusion_common::stats::Statistics>::try_from (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::statement::_<impl datafusion_sql::planner::SqlToRel<S>>::statement_to_plan (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::statement::_<impl datafusion_sql::planner::SqlToRel<S>>::sql_statement_to_plan_with_context_impl (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::query::_<impl datafusion_sql::planner::SqlToRel<S>>::query_to_plan_with_schema (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::set_expr::_<impl datafusion_sql::planner::SqlToRel<S>>::set_expr_to_plan (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::select::_<impl datafusion_sql::planner::SqlToRel<S>>::select_to_plan (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::select::_<impl datafusion_sql::planner::SqlToRel<S>>::plan_from_tables (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::relation::join::_<impl datafusion_sql::planner::SqlToRel<S>>::plan_table_with_joins (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_sql::relation::_<impl datafusion_sql::planner::SqlToRel<S>>::create_relation (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_expr::logical_plan::builder::LogicalPlanBuilder::scan (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_expr::logical_plan::plan::TableScan::try_new (1 samples, 6.25%)datafusi..datafusion_integration`datafusion_common::dfschema::DFSchema::try_from_qualified_schema (1 samples, 6.25%)datafusi..datafusion_integration`alloc::collections::btree::map::BTreeMap<K,V,A>::insert (1 samples, 6.25%)datafusi..datafusion_integration`<vayu::operators::scan::ScanOperator as vayu::pipeline::Source>::get_data (1 samples, 6.25%)datafusi..datafusion_integration`tokio::runtime::scheduler::multi_thread::worker::block_in_place (1 samples, 6.25%)datafusi..datafusion_integration`tokio::runtime::context::runtime_mt::exit_runtime (1 samples, 6.25%)datafusi..datafusion_integration`tokio::runtime::runtime::Runtime::block_on (1 samples, 6.25%)datafusi..datafusion_integration`tokio::runtime::context::runtime::enter_runtime (1 samples, 6.25%)datafusi..datafusion_integration`tokio::runtime::park::CachedParkThread::block_on (1 samples, 6.25%)datafusi..datafusion_integration`<datafusion::datasource::physical_plan::file_stream::FileStream<F> as futures_core::stream::Stream>::poll_next (1 samples, 6.25%)datafusi..datafusion_integration`<futures_util::stream::iter::Iter<I> as futures_core::stream::Stream>::poll_next (1 samples, 6.25%)datafusi..datafusion_integration`<arrow_csv::reader::BufReader<R> as core::iter::traits::iterator::Iterator>::next (1 samples, 6.25%)datafusi..datafusion_integration`arrow_csv::reader::Decoder::decode (1 samples, 6.25%)datafusi..datafusion_integration`arrow_csv::reader::records::RecordDecoder::decode (1 samples, 6.25%)datafusi..datafusion_integration`alloc::raw_vec::RawVec<T,A>::reserve::do_reserve_and_handle (1 samples, 6.25%)datafusi..datafusion_integration`alloc::raw_vec::finish_grow (1 samples, 6.25%)datafusi..libsystem_malloc.dylib`realloc (1 samples, 6.25%)libsyste..libsystem_malloc.dylib`malloc_zone_realloc (1 samples, 6.25%)libsyste..libsystem_platform.dylib`_platform_memmove (1 samples, 6.25%)libsyste..all (16 samples, 100%)dyld`start (16 samples, 100.00%)dyld`startdatafusion_integration`main (16 samples, 100.00%)datafusion_integration`maindatafusion_integration`std::rt::lang_start (16 samples, 100.00%)datafusion_integration`std::rt::lang_startdatafusion_integration`std::rt::lang_start_internal (16 samples, 100.00%)datafusion_integration`std::rt::lang_start_internaldatafusion_integration`std::rt::lang_start::_{{closure}} (16 samples, 100.00%)datafusion_integration`std::rt::lang_start::_{{closure}}datafusion_integration`std::sys_common::backtrace::__rust_begin_short_backtrace (16 samples, 100.00%)datafusion_integration`std::sys_common::backtrace::__rust_begin_short_backtracedatafusion_integration`datafusion_integration::main (16 samples, 100.00%)datafusion_integration`datafusion_integration::maindatafusion_integration`tokio::runtime::runtime::Runtime::block_on (15 samples, 93.75%)datafusion_integration`tokio::runtime::runtime::Runtime::block_ondatafusion_integration`tokio::runtime::park::CachedParkThread::block_on (15 samples, 93.75%)datafusion_integration`tokio::runtime::park::CachedParkThread::block_ondatafusion_integration`datafusion_integration::main::_{{closure}} (15 samples, 93.75%)datafusion_integration`datafusion_integration::main::_{{closure}}datafusion_integration`vayu::execute (2 samples, 12.50%)datafusion_integrat..datafusion_integration`arrow_select::filter::filter_record_batch (1 samples, 6.25%)datafusi.. \ No newline at end of file diff --git a/datafusion-integration/src/main.rs b/datafusion-integration/src/main.rs new file mode 100644 index 0000000..07abe5d --- /dev/null +++ b/datafusion-integration/src/main.rs @@ -0,0 +1,131 @@ +use datafusion::error::Result; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::{CsvReadOptions, SessionContext}; +use std::sync::Arc; +mod util; +use vayu::{sinks, VayuExecutionEngine}; + +#[tokio::main] +async fn main() -> Result<()> { + test_scan_filter_project().await?; + test_hash_join().await?; + test_store_record_batch().await?; + Ok(()) +} + +async fn test_hash_join() -> Result<()> { + let ctx: SessionContext = SessionContext::new(); + // register csv file with the execution context + ctx.register_csv( + "a", + &format!("./testing/data/csv/join_test_A.csv"), + CsvReadOptions::new(), + ) + .await?; + ctx.register_csv( + "b", + &format!("./testing/data/csv/join_test_B.csv"), + CsvReadOptions::new(), + ) + .await?; + // get executor + let mut executor = VayuExecutionEngine::new(); + + let uuid = 1; + // get execution plan from th sql query + let sql = "SELECT * FROM a,b WHERE a.a1 = b.b1 "; + let plan = util::get_execution_plan_from_sql(&ctx, sql).await?; + + // probe plan would be same as main plan + let probe_plan = plan.clone(); + // get join node and build_plan + // Note: build_plan does not have build/join node as that would be part of sink operator + let (join_node, build_plan) = util::get_hash_build_pipeline(plan.clone()); + + // run the build pipeline with the executor + run_pipeline( + &mut executor, + build_plan, + sinks::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node), + ) + .await; + + // run the probe pipeline with the executor + run_pipeline( + &mut executor, + probe_plan, + sinks::SchedulerSinkType::PrintOutput, + ) + .await; + + Ok(()) +} + +async fn test_scan_filter_project() -> Result<()> { + // create local execution context + let ctx: SessionContext = SessionContext::new(); + // register csv file with the execution context + + ctx.register_csv( + "aggregate_test_100", + &format!("./testing/data/csv/aggregate_test_100.csv"), + CsvReadOptions::new(), + ) + .await?; + let mut executor = VayuExecutionEngine::new(); + let sql = "SELECT c1,c3 as neg,c4 as pos,c13 FROM aggregate_test_100 WHERE (c3 < 0 AND c1='a') OR ( c4 > 0 AND c1='b' ) "; + + let plan = util::get_execution_plan_from_sql(&ctx, sql).await?; + + run_pipeline(&mut executor, plan, sinks::SchedulerSinkType::PrintOutput).await; + + Ok(()) +} + +async fn test_store_record_batch() -> Result<()> { + // create local execution context + let ctx: SessionContext = SessionContext::new(); + // register csv file with the execution context + + ctx.register_csv( + "aggregate_test_100", + &format!("./testing/data/csv/aggregate_test_100.csv"), + CsvReadOptions::new(), + ) + .await?; + let mut executor = VayuExecutionEngine::new(); + let sql = "SELECT c1,c3 as neg,c4 as pos,c13 FROM aggregate_test_100 WHERE c3 < 0 AND c1='a' "; + + let plan = util::get_execution_plan_from_sql(&ctx, sql).await?; + let uuid = 42; + run_pipeline( + &mut executor, + plan, + sinks::SchedulerSinkType::StoreRecordBatch(uuid), + ) + .await; + let sql = + "SELECT c1,c3 as neg,c4 as pos,c13 FROM aggregate_test_100 WHERE c4 > 0 AND c1='b' "; + + let plan = util::get_execution_plan_from_sql(&ctx, sql).await?; + let uuid = 42; + run_pipeline( + &mut executor, + plan, + sinks::SchedulerSinkType::StoreRecordBatch(uuid), + ) + .await; + + // get executor to print the value + executor.sink(uuid); + + Ok(()) +} + +async fn run_pipeline( + executor: &mut VayuExecutionEngine, + plan: Arc, + sink: sinks::SchedulerSinkType, +) { + executor.execute(vayu::SchedulerPipeline::new(plan, sink)); +} diff --git a/datafusion-integration/src/util.rs b/datafusion-integration/src/util.rs new file mode 100644 index 0000000..80412c6 --- /dev/null +++ b/datafusion-integration/src/util.rs @@ -0,0 +1,61 @@ +use core::panic; +use datafusion::datasource::physical_plan::CsvExec; +use datafusion::error::Result; +use datafusion::execution::context::SessionState; +use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::displayable; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::joins::HashJoinExec; +use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use std::sync::Arc; + +/** + * returns (join_node, build_plan) + * Note: build_plan won't have join node + */ +pub fn get_hash_build_pipeline( + plan: Arc, +) -> (Arc, Arc) { + let plan1 = plan.clone(); + let p = plan.as_any(); + + if let Some(exec) = p.downcast_ref::() { + return (plan1, exec.left().clone()); + } + if let Some(_) = p.downcast_ref::() { + panic!("should never reach csvexec in get_hash_build_pipeline "); + } + if let Some(exec) = p.downcast_ref::() { + return get_hash_build_pipeline(exec.input().clone()); + } + if let Some(exec) = p.downcast_ref::() { + return get_hash_build_pipeline(exec.input().clone()); + } + if let Some(exec) = p.downcast_ref::() { + return get_hash_build_pipeline(exec.input().clone()); + } + if let Some(exec) = p.downcast_ref::() { + return get_hash_build_pipeline(exec.input().clone()); + } + panic!("No join node found"); +} + +pub async fn get_execution_plan_from_sql( + ctx: &SessionContext, + sql: &str, +) -> Result> { + // create datafusion logical plan + let logical_plan = SessionState::create_logical_plan(&ctx.state(), sql).await?; + // create datafusion physical plan + let plan = SessionState::create_physical_plan(&ctx.state(), &logical_plan).await?; + // print datafusion physical plan + println!( + "Detailed physical plan:\n{}", + displayable(plan.as_ref()).indent(true) + ); + // panic!("hello"); + Ok(plan) +} diff --git a/datafusion-integration/testing/LICENSE.txt b/datafusion-integration/testing/LICENSE.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/datafusion-integration/testing/LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/datafusion-integration/testing/README.md b/datafusion-integration/testing/README.md new file mode 100644 index 0000000..8ef154b --- /dev/null +++ b/datafusion-integration/testing/README.md @@ -0,0 +1,27 @@ +## DATAFUSION INTEGRATION + +how to run? +`cargo run` + +what will it do? +one SQL query is hardcoded in main file. +1. It will use datafusion to create a logical plan +2. Then it will create a trait based physical plan. +3. Then it will convert it to node based datafusion physical plan +4. It can either run using default datafusion execution engine or cmu execution engine. + + +Idea is to develop the execution as a library and then this crate can be used to test the execution engine by passing the physical node to it. + + + + + + + + Convert Node based plan to DAG + + execute the dag + + + \ No newline at end of file diff --git a/datafusion-integration/testing/data/csv/README.md b/datafusion-integration/testing/data/csv/README.md new file mode 100644 index 0000000..be66724 --- /dev/null +++ b/datafusion-integration/testing/data/csv/README.md @@ -0,0 +1,5 @@ +# CSV Test Files + +| File | Description | +|------|-------------| +|aggregate_test_100.csv | Randomly generated data file with two grouping keys and columns of each data type, suitable for testing aggregate queries. File was generated by https://github.com/andygrove/test-data-generator | diff --git a/datafusion-integration/testing/data/csv/aggregate_test_100.csv b/datafusion-integration/testing/data/csv/aggregate_test_100.csv new file mode 100644 index 0000000..e548b75 --- /dev/null +++ b/datafusion-integration/testing/data/csv/aggregate_test_100.csv @@ -0,0 +1,101 @@ +c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 +c,2,1,18109,2033001162,-6513304855495910254,25,43062,1491205016,5863949479783605708,0.110830784,0.9294097332465232,6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW +d,5,-40,22614,706441268,-7542719935673075327,155,14337,3373581039,11720144131976083864,0.69632107,0.3114712539863804,C2GT5KVyOPZpgKVl110TyZO0NcJ434 +b,1,29,-18218,994303988,5983957848665088916,204,9489,3275293996,14857091259186476033,0.53840446,0.17909035118828576,AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz +a,1,-85,-15154,1171968280,1919439543497968449,77,52286,774637006,12101411955859039553,0.12285209,0.6864391962767343,0keZ5G8BffGwgF2RwQD59TFzMStxCB +b,5,-82,22080,1824882165,7373730676428214987,208,34331,3342719438,3330177516592499461,0.82634634,0.40975383525297016,Ig1QcuKsjHXkproePdERo2w0mYzIqd +b,4,-111,-1967,-4229382,1892872227362838079,67,9832,1243785310,8382489916947120498,0.06563997,0.152498292971736,Sfx0vxv1skzZWT1PqVdoRDdO6Sb6xH +e,3,104,-25136,1738331255,300633854973581194,139,20807,3577318119,13079037564113702254,0.40154034,0.7764360990307122,DuJNG8tufSqW0ZstHqWj3aGvFLMg4A +a,3,13,12613,1299719633,2020498574254265315,191,17835,3998790955,14881411008939145569,0.041445434,0.8813167497816289,Amn2K87Db5Es3dFQO9cw9cvpAM6h35 +d,1,38,18384,-335410409,-1632237090406591229,26,57510,2712615025,1842662804748246269,0.6064476,0.6404495093354053,4HX6feIvmNXBN7XGqgO4YVBkhu8GDI +a,4,-38,20744,762932956,308913475857409919,7,45465,1787652631,878137512938218976,0.7459874,0.02182578039211991,ydkwycaISlYSlEq3TlkS2m15I2pcp8 +d,1,57,28781,-1143802338,2662536767954229885,202,62167,879082834,4338034436871150616,0.7618384,0.42950521730777025,VY0zXmXeksCT8BzvpzpPLbmU9Kp9Y4 +a,4,-54,-2376,434021400,5502271306323260832,113,15777,2502326480,7966148640299601101,0.5720931,0.30585375151301186,KJFcmTVjdkCMv94wYCtfHMFhzyRsmH +e,3,112,-6823,-421042466,8535335158538929274,129,32712,3759340273,9916295859593918600,0.6424343,0.6316565296547284,BsM5ZAYifRh5Lw3Y8X1r53I0cTJnfE +d,2,113,3917,-108973366,-7220140168410319165,197,24380,63044568,4225581724448081782,0.11867094,0.2944158618048994,90gAtmGEeIqUTbo1ZrxCvWtsseukXC +b,1,54,-18410,1413111008,-7145106120930085900,249,5382,1842680163,17818611040257178339,0.8881188,0.24899794314659673,6FPJlLAcaQ5uokyOWZ9HGdLZObFvOZ +c,1,103,-22186,431378678,1346564663822463162,146,12393,3766999078,10901819591635583995,0.064453244,0.7784918983501654,2T3wSlHdEmASmO0xcXHnndkKEt6bz8 +e,2,49,24495,-587831330,9178511478067509438,129,12757,1289293657,10948666249269100825,0.5610077,0.5991138115095911,bgK1r6v3BCTh0aejJUhkA1Hn6idXGp +d,1,-98,13630,-1991133944,1184110014998006843,220,2986,225513085,9634106610243643486,0.89651865,0.1640882545084913,y7C453hRWd4E7ImjNDWlpexB8nUqjh +d,3,77,15091,-1302295658,8795481303066536947,154,35477,2093538928,17419098323248948387,0.11952883,0.7035635283169166,O66j6PaYuZhEUtqV6fuU7TyjM2WxC5 +e,2,97,18167,1593800404,-9112448817105133638,163,45185,3188005828,2792105417953811674,0.38175434,0.4094218353587008,ukOiFGGFnQJDHFgZxHMpvhD3zybF0M +e,4,-56,-31500,1544188174,3096047390018154410,220,417,557517119,2774306934041974261,0.15459597,0.19113293583306745,IZTkHMLvIKuiLjhDjYMmIHxh166we4 +d,1,-99,5613,1213926989,-8863698443222021480,19,18736,4216440507,14933742247195536130,0.6067944,0.33639590659276175,aDxBtor7Icd9C5hnTvvw5NrIre740e +a,5,36,-16974,623103518,6834444206535996609,71,29458,141047417,17448660630302620693,0.17100024,0.04429073092078406,OF7fQ37GzaZ5ikA2oMyvleKtgnLjXh +e,4,-53,13788,2064155045,-691093532952651300,243,35106,2778168728,9463973906560740422,0.34515214,0.27159190516490006,0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm +c,2,-29,25305,-537142430,-7683452043175617798,150,31648,598822671,11759014161799384683,0.8315913,0.946325164889271,9UbObCsVkmYpJGcGrgfK90qOnwb2Lj +a,1,-25,15295,383352709,4980135132406487265,231,102,3276123488,12763583666216333412,0.53796273,0.17592486905979987,XemNcT1xp61xcM1Qz3wZ1VECCnq06O +c,4,123,16620,852509237,-3087630526856906991,196,33715,3566741189,4546434653720168472,0.07606989,0.819715865079681,8LIh0b6jmDGm87BmIyjdxNIpX4ugjD +a,5,-31,-12907,586844478,-4862189775214031241,170,28086,1013876852,11005002152861474932,0.35319167,0.05573662213439634,MeSTAXq8gVxVjbEjgkvU9YLte0X9uE +a,2,45,15673,-1899175111,398282800995316041,99,2555,145294611,8554426087132697832,0.17333257,0.6405262429561641,b3b9esRhTzFEawbs6XhpKnD9ojutHB +b,3,17,14457,670497898,-2390782464845307388,255,24770,1538863055,12662506238151717757,0.34077626,0.7614304100703713,6x93sxYioWuq5c9Kkk8oTAAORM7cH0 +e,4,97,-13181,2047637360,6176835796788944083,158,53000,2042457019,9726016502640071617,0.7085086,0.12357539988406441,oHJMNvWuunsIMIWFnYG31RCfkOo2V7 +c,2,-60,-16312,-1808210365,-3368300253197863813,71,39635,2844041986,7045482583778080653,0.805363,0.6425694115212065,BJqx5WokrmrrezZA0dUbleMYkG5U2O +e,1,36,-21481,-928766616,-3471238138418013024,150,52569,2610290479,7788847578701297242,0.2578469,0.7670021786149205,gpo8K5qtYePve6jyPt6xgJx4YOVjms +b,5,-5,24896,1955646088,2430204191283109071,118,43655,2424630722,11429640193932435507,0.87989986,0.7328050041291218,JafwVLSVk5AVoXFuzclesQ000EE2k1 +a,3,13,32064,912707948,3826618523497875379,42,21463,2214035726,10771380284714693539,0.6133468,0.7325106678655877,i6RQVXKUh7MzuGMDaNclUYnFUAireU +c,1,41,-4667,-644225469,7049620391314639084,196,48099,2125812933,15419512479294091215,0.5780736,0.9255031346434324,mzbkwXKrPeZnxg2Kn1LRF5hYSsmksS +d,2,93,-12642,2053379412,6468763445799074329,147,50842,1000948272,5536487915963301239,0.4279275,0.28534428578703896,lqhzgLsXZ8JhtpeeUWWNbMz8PHI705 +c,3,73,-9565,-382483011,1765659477910680019,186,1535,1088543984,2906943497598597237,0.680652,0.6009475544728957,Ow5PGpfTm4dXCfTDsXAOTatXRoAydR +c,3,-2,-18655,-2141999138,-3154042970870838072,251,34970,3862393166,13062025193350212516,0.034291923,0.7697753383420857,IWl0G3ZlMNf7WT8yjIB49cx7MmYOmr +c,3,22,13741,-2098805236,8604102724776612452,45,2516,1362369177,196777795886465166,0.94669616,0.0494924465469434,6oIXZuIPIqEoPBvFmbt2Nxy3tryGUE +b,2,63,21456,-2138770630,-2380041687053733364,181,57594,2705709344,13144161537396946288,0.09683716,0.3051364088814128,nYVJnVicpGRqKZibHyBAmtmzBXAFfT +d,4,102,-24558,1991172974,-7823479531661596016,14,36599,1534194097,2240998421986827216,0.028003037,0.8824879447595726,0og6hSkhbX8AC1ktFS4kounvTzy8Vo +d,1,-8,27138,-1383162419,7682021027078563072,36,64517,2861376515,9904216782086286050,0.80954456,0.9463098243875633,AFGCj7OWlEB5QfniEFgonMq90Tq5uH +a,3,17,-22796,1337043149,-1282905594104562444,167,2809,754775609,732272194388185106,0.3884129,0.658671129040488,VDhtJkYjAYPykCgOU9x3v7v3t4SO1a +e,2,52,23388,715235348,605432070100399212,165,56980,3314983189,7386391799827871203,0.46076488,0.980809631269599,jQimhdepw3GKmioWUlVSWeBVRKFkY3 +b,5,68,21576,1188285940,5717755781990389024,224,27600,974297360,9865419128970328044,0.80895734,0.7973920072996036,ioEncce3mPOXD2hWhpZpCPWGATG6GU +b,2,31,23127,-800561771,-8706387435232961848,153,27034,1098639440,3343692635488765507,0.35692692,0.5590205548347534,okOkcWflkNXIy4R8LzmySyY1EC3sYd +c,1,-24,-24085,-1882293856,7385529783747709716,41,48048,520189543,2402288956117186783,0.39761502,0.3600766362333053,Fi4rJeTQq4eXj8Lxg3Hja5hBVTVV5u +a,4,65,-28462,-1813935549,7602389238442209730,18,363,1865307672,11378396836996498283,0.09130204,0.5593249815276734,WHmjWk2AY4c6m7DA4GitUx6nmb1yYS +d,1,125,31106,-1176490478,-4306856842351827308,90,17910,3625286410,17869394731126786457,0.8882508,0.7631239070049998,dVdvo6nUD5FgCgsbOZLds28RyGTpnx +b,4,17,-28070,-673237643,1904316899655860234,188,27744,933879086,3732692885824435932,0.41860116,0.40342283197779727,JHNgc2UCaiXOdmkxwDDyGhRlO0mnBQ +c,2,-106,-1114,-1927628110,1080308211931669384,177,20421,141680161,7464432081248293405,0.56749094,0.565352842229935,Vp3gmWunM5A7wOC9YW2JroFqTWjvTi +d,5,-59,2045,-2117946883,1170799768349713170,189,63353,1365198901,2501626630745849169,0.75173044,0.18628859265874176,F7NSTjWvQJyBburN7CXRUlbgp2dIrA +d,4,55,-1471,1902023838,1252101628560265705,157,3691,811650497,1524771507450695976,0.2968701,0.5437595540422571,f9ALCzwDAKmdu7Rk2msJaB1wxe5IBX +b,2,-60,-21739,-1908480893,-8897292622858103761,59,50009,2525744318,1719090662556698549,0.52930677,0.560333188635217,l7uwDoTepWwnAP0ufqtHJS3CRi7RfP +d,3,-76,8809,141218956,-9110406195556445909,58,5494,1824517658,12046662515387914426,0.8557294,0.6668423897406515,Z2sWcQr0qyCJRMHDpRy3aQr7PkHtkK +e,4,73,-22501,1282464673,2541794052864382235,67,21119,538589788,9575476605699527641,0.48515016,0.296036538664718,4JznSdBajNWhu4hRQwjV1FjTTxY68i +b,4,-117,19316,2051224722,-5534418579506232438,133,52046,3023531799,13684453606722360110,0.62608826,0.8506721053047003,mhjME0zBHbrK6NMkytMTQzOssOa1gF +a,4,-101,11640,1993193190,2992662416070659899,230,40566,466439833,16778113360088370541,0.3991115,0.574210838214554,NEhyk8uIx4kEULJGa8qIyFjjBcP2G6 +b,5,62,16337,41423756,-2274773899098124524,121,34206,2307004493,10575647935385523483,0.23794776,0.1754261586710173,qnPOOmslCJaT45buUisMRnM0rc77EK +c,4,-79,5281,-237425046,373011991904079451,121,55620,2818832252,2464584078983135763,0.49774808,0.9237877978193884,t6fQUjJejPcjc04wHvHTPe55S65B4V +b,2,68,15874,49866617,1179733259727844435,121,23948,3455216719,3898128009708892708,0.6306253,0.9185813970744787,802bgTGl6Bk5TlkPYYTxp5JkKyaYUA +c,1,70,27752,1325868318,1241882478563331892,63,61637,473294098,4976799313755010034,0.13801557,0.5081765563442366,Ktb7GQ0N1DrxwkCkEUsTaIXk0xYinn +e,2,-61,-2888,-1660426473,2553892468492435401,126,35429,4144173353,939909697866979632,0.4405142,0.9231889896940375,BPtQMxnuSPpxMExYV9YkDa6cAN7GP3 +e,4,74,-12612,-1885422396,1702850374057819332,130,3583,3198969145,10767179755613315144,0.5518061,0.5614503754617461,QEHVvcP8gxI6EMJIrvcnIhgzPNjIvv +d,2,122,10130,-168758331,-3179091803916845592,30,794,4061635107,15695681119022625322,0.69592506,0.9748360509016578,OPwBqCEK5PWTjWaiOyL45u2NLTaDWv +e,3,71,194,1436496767,-5639533800082367925,158,44507,3105312559,3998472996619161534,0.930117,0.6108938307533,pTeu0WMjBRTaNRT15rLCuEh3tBJVc5 +c,5,-94,-15880,2025611582,-3348824099853919681,5,40622,4268716378,12849419495718510869,0.34163946,0.4830878559436823,RilTlL1tKkPOUFuzmLydHAVZwv1OGl +d,1,-72,25590,1188089983,3090286296481837049,241,832,3542840110,5885937420286765261,0.41980565,0.21535402343780985,wwXqSGKLyBQyPkonlzBNYUJTCo4LRS +e,1,71,-5479,-1339586153,-3920238763788954243,123,53012,4229654142,10297218950720052365,0.73473036,0.5773498217058918,cBGc0kSm32ylBDnxogG727C0uhZEYZ +e,4,96,-30336,427197269,7506304308750926996,95,48483,3521368277,5437030162957481122,0.58104324,0.42073125331890115,3BEOHQsMEFZ58VcNTOJYShTBpAPzbt +a,2,-48,-18025,439738328,-313657814587041987,222,13763,3717551163,9135746610908713318,0.055064857,0.9800193410444061,ukyD7b0Efj7tNlFSRmzZ0IqkEzg2a8 +a,1,-56,8692,2106705285,-7811675384226570375,231,15573,1454057357,677091006469429514,0.42794758,0.2739938529235548,JN0VclewmjwYlSl8386MlWv5rEhWCz +e,2,52,-12056,-1090239422,9011500141803970147,238,4168,2013662838,12565360638488684051,0.6694766,0.39144436569161134,xipQ93429ksjNcXPX5326VSg1xJZcW +a,1,-5,12636,794623392,2909750622865366631,15,24022,2669374863,4776679784701509574,0.29877836,0.2537253407987472,waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs +b,1,12,7652,-1448995523,-5332734971209541785,136,49283,4076864659,15449267433866484283,0.6214579,0.05636955101974106,akiiY5N0I44CMwEnBL6RTBk7BRkxEj +e,5,64,-26526,1689098844,8950618259486183091,224,45253,662099130,16127995415060805595,0.2897315,0.5759450483859969,56MZa5O1hVtX4c5sbnCfxuX5kDChqI +c,4,-90,-2935,1579876740,6733733506744649678,254,12876,3593959807,4094315663314091142,0.5708688,0.5603062368164834,Ld2ej8NEv5zNcqU60FwpHeZKBhfpiV +e,5,-86,32514,-467659022,-8012578250188146150,254,2684,2861911482,2126626171973341689,0.12559289,0.01479305307777301,gxfHWUF8XgY2KdFxigxvNEXe2V2XMl +c,2,-117,-30187,-1222533990,-191957437217035800,136,47061,2293105904,12659011877190539078,0.2047385,0.9706712283358269,pLk3i59bZwd5KBZrI1FiweYTd5hteG +a,3,14,28162,397430452,-452851601758273256,57,14722,431948861,8164671015278284913,0.40199697,0.07260475960924484,TtDKUZxzVxsq758G6AWPSYuZgVgbcl +c,2,29,-3855,1354539333,4742062657200940467,81,53815,3398507249,562977550464243101,0.7124534,0.991517828651004,Oq6J4Rx6nde0YlhOIJkFsX2MsSvAQ0 +b,4,-59,25286,1423957796,2646602445954944051,0,61069,3570297463,15100310750150419896,0.49619365,0.04893135681998029,fuyvs0w7WsKSlXqJ1e6HFSoLmx03AG +a,1,83,-14704,2143473091,-4387559599038777245,37,829,4015442341,4602675983996931623,0.89542526,0.9567595541247681,ErJFw6hzZ5fmI5r8bhE4JzlscnhKZU +a,3,-12,-9168,1489733240,-1569376002217735076,206,33821,3959216334,16060348691054629425,0.9488028,0.9293883502480845,oLZ21P2JEDooxV1pU31cIxQHEeeoLu +c,4,3,-30508,659422734,-6455460736227846736,133,59663,2306130875,8622584762448622224,0.16999894,0.4273123318932347,EcCuckwsF3gV1Ecgmh5v4KM8g1ozif +a,3,-72,-11122,-2141451704,-2578916903971263854,83,30296,1995343206,17452974532402389080,0.94209343,0.3231750610081745,e2Gh6Ov8XkXoFdJWhl0EjwEHlMDYyG +c,2,-107,-2904,-1011669561,782342092880993439,18,29527,1157161427,4403623840168496677,0.31988364,0.36936304600612724,QYlaIAnJA6r8rlAb6f59wcxvcPcWFf +c,5,118,19208,-134213907,-2120241105523909127,86,57751,1229567292,16493024289408725403,0.5536642,0.9723580396501548,TTQUwpMNSXZqVBKAFvXu7OlWvKXJKX +c,3,97,29106,-903316089,2874859437662206732,207,42171,3473924576,8188072741116415408,0.32792538,0.2667177795079635,HKSMQ9nTnwXCJIte1JrM1dtYnDtJ8g +b,3,-101,-13217,-346989627,5456800329302529236,26,54276,243203849,17929716297117857676,0.05422181,0.09465635123783445,MXhhH1Var3OzzJCtI9VNyYvA0q8UyJ +a,2,-43,13080,370975815,5881039805148485053,2,20120,2939920218,906367167997372130,0.42733806,0.16301110515739792,m6jD0LBIQWaMfenwRCTANI9eOdyyto +a,5,-101,-12484,-842693467,-6140627905445351305,57,57885,2496054700,2243924747182709810,0.59520596,0.9491397432856566,QJYm7YRA3YetcBHI5wkMZeLXVmfuNy +b,5,-44,15788,-629486480,5822642169425315613,13,11872,3457053821,2413406423648025909,0.44318348,0.32869374687050157,ALuRhobVWbnQTTWZdSOk0iVe8oYFhW +d,4,5,-7688,702611616,6239356364381313700,4,39363,3126475872,35363005357834672,0.3766935,0.061029375346466685,H5j5ZHy1FGesOAHjkQEDYCucbpKWRu +e,1,120,10837,-1331533190,6342019705133850847,245,3975,2830981072,16439861276703750332,0.6623719,0.9965400387585364,LiEBxds3X0Uw0lxiYjDqrkAaAwoiIW +e,3,-95,13611,2030965207,927403809957470678,119,59134,559847112,10966649192992996919,0.5301289,0.047343434291126085,gTpyQnEODMcpsPnJMZC66gh33i3m0b +d,3,123,29533,240273900,1176001466590906949,117,30972,2592330556,12883447461717956514,0.39075065,0.38870280983958583,1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO +b,4,47,20690,-1009656194,-2027442591571700798,200,7781,326151275,2881913079548128905,0.57360977,0.2145232647388039,52mKlRE3aHCBZtjECq6sY9OqVf8Dze +e,4,30,-16110,61035129,-3356533792537910152,159,299,28774375,13526465947516666293,0.6999775,0.03968347085780355,cq4WSAIFwx3wwTUS5bp1wCe71R6U5I \ No newline at end of file diff --git a/datafusion-integration/testing/data/csv/aggregate_test_100.csv.gz b/datafusion-integration/testing/data/csv/aggregate_test_100.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..62f622a9b5c15e8873dc7c0b3ee7cb7c844c7a97 GIT binary patch literal 9117 zcmV;OBVybiiwFpGx5#4v17T-pa%E>>bY)+3Wpi|2F)%PLV{>)@HCxAeGE1`DpJ!>^ zuS}QeCEf{;0O5AR2!uy?p8iH8|IEyp(FoL3rHPElOr|=Snq+E|X_`!3GWE$cB-5Bo zvt+6j{=w@SZ_xI?X<`zc=%`he7-g+eT6>ecd8hGz(&${8xlAkL(!{tVwF*B$&fOI-4ns>B$t(Q>+39xvISJ(5gtHk`zPvy2Ko6haC=(1k<+3cS0g*>c; z<3t0`DIO!56nD|yCBgFF$_Jks{3gv}fSF=~XIc%iL|kAT23 za0$++V`gG7#supPB1DBO!RzoRUNfl;(OIMlUcoIg_%&m|C>$N9P~IWxGe(kjIA=5n z@}Tjkr2BJM*ZWnU*3a>vr$3JU*}hYpwS9Nu@74e2iA^*DScHX3ka^zP zh;)-UhCLE(nUgqLVqFMU^SAgVj;d^M9=?Mo4KtI5K@=#lQGl6|iW%OeaAuPfFZEU> zo-aS$<3Np1=jw5=vfq_!`MtSFbBH9I3I9R9@xO5|ma6dz$c?BN(&I5g~^ z;Qo;@D+w4QBT1=VbI zFiJ>)N*+;UGUwqoB)v_8qn%)Lcqk;eHKHDN3P3!M5RbSvT&5-V5PEWikODpet)uWe z0~kRP8u%DCe$3Xl$Mn0_-Hf`M($V%d>#o^qS2Ks{A&Pk%a){4mcq4a|u{<8`>?{&3 zApc>T2!wzi6OTL$2^|KAfJcyc__P#p5`%`typ|LBm%$NExSVoE6KF%ALHi^19Bzm6 z^{LQ{`RDcUJbb)QwqIG}kd4xpeH;VZ;&O00?vyH(ICukYLGoK*KfxIJ#W+Jm;{tdU zzfA!bKq}xp5Jw>bJ*tpm>N_BLX8Dsm9yIc@)?nr11B#xn5{K?nq&;4uzf*L)sOAT#g*JVU(3+wl<@PakZ;w1*vlj*f91 zJ?_KE0Xz^4{|}x85CDk0OM`*k@~5U7=SIIjouzxf*-;$oKWA%d+N1HFby zo8nsVq9SOa4yi@%ysebxHB^wvUXTCoh3nvaI?XTZVL{3oeWv-jNMq!vWHC;JEY^q$ zS|7haeL!}hj<^?gBDW%rYZ>GCc+KQQ023K(Cg~no$ao|kBmmqsQUVBU6JP$QZ{Bm$ z?@xExuOIEfIIC7>@8kBQd~J_GNPsX34l{;M#vcNUeE=4q1_1Ot3buhCtrjA}I%YH2 zG;zxyXa-WU^@JKYz9PLT{=S+LoCDWJex); zqJFy+#0JK)$s-hCz-4DJOG}})UA2mX&0&7e$KBU)wZw@mX$CJqOtQ#(0HljV9hU#ToEGub3$^zge1R%mB4{?`}YE+a+q(IoDz#=KJaHWZn85h8i;8Q>t3bID(w7p;X zkMGfCyexg4kA44h`3k+qs%ZxIf)l?Xj}Q%>p9%C;h*U&8tOaKxN`qEpx}J_hJR|C1 z8fmE`ftGCU`S#V ztcBVv6bWx9N{P7`2?#p`J}Fn~lG?bpt~TGVwZ`T5eYGsEM$K)ldSAkTw|V+6l#tu6 z`VZED5Jic8iy+7N2~`3-iFhvL64oTq!KIN<5CMU50PGv+1d2o!iRY+Su??A|8|{nJ zv0#Hr!gR@Z@t!~L|3Ym#%L;C}>s_xm*ZQmd%~3_?s3vfNXSK%-QM^FK0N63{s9pG% z!ILa9#)IV~fz&)joN01|Kx@I8%r?{<#GLVl<`RyLv_QBEH$jC{sBW2|OaUEkm&4De z4VCBj*!`|^X5V~VH}l4Mv3p+)m;c;Gq7))o0N58w8;twLbY=)6+6`5cASD2a_(^Xe z_CPeD8h}|Dy$ciri9j(TdRs&s>a3u5K!hRzzhBY(=}hk^eXpN1hI-jgJN#1R4q~9_R_82u4k$a0-EqT1(Xe2=y==R0G@s z!{I#;Rx$OGW=Jo`jE8u`p$!iNZG-bA+1I2!DV+N2?dVXiOe=qjpGKyR?cHr+-d8VC zLiUBC2oE9xok48^cA{LsBFJl44(0&Dz^Q;m!?KGy1NHtbJA#*8fkEN;voCcmrLBmOwz*yidviB^PzHgJ0A1)7#T`}9QF6Szx zB)LUG0TH0f6oms~80?ZI9_2rcs6|L%CL@jl=m`Wee}B@T#jyS%18LX#^7g~ z>E_^kKmz~|92X}+ngI$F-)L|N1>npO_oN$uBv9kPq?4K!os!~+3%Vl?}HEVO>0e~1360#0Z1+nR+3lJfRynV4^7Z504fgB5?cd-0InJrs z*JLwqj@A|TW!kWk7UX8NU|E4gK{jjFI|^tX1N4MwKp-Y2Ieq|y3$Os9F0fYt`Uph@ z`J|8|pt}qv>Z4Rd9K;P{KQQzG-!54k_OaoP=h6J*I`^x`$JyT(^?N#M_gCA~t1ZD0 ziR{FnK!HRm#tqzWXa!9^0n#Xd2$~K1@$Dp;3e+6GMBKt<8Y%z?22LqWB=8!fiFhtp zWC$`6c!j6o8TIdA5pg;GC=Uxd02iBkhn?&%lh@5&;GFS zwG7SYrP3^Zx2Bt~(XRZJ`~91L3F$_-y@6dL!V!AxD!|xGJm4k-3;8>84r+>>Qbr^~uD}8aM;L?U0L~+D6BtPM@hmVBFiNsixC$nQB@Hl|Lq$N%I*$sNtWS)kiW2}kloBj6As4NMU%3M>T`Lz;3Zz$jlVLu|4ktBlYkoIDUn z!4@*0Rr}^XDu_n39$IB$~l3|A%`L;xHWg<6tdWPp~S zq`3tXIwU4O8zcZ_27yfaOkECU=QP_OivE;7fU@{wjXEjW#)4!~%R#$ftfV&BsEQqL znysHO=*7pXmQS+ogunds*sC>c3?21GjftGTT4Y{*-ZAD@5=C@-iWjX zT?c5&p)5s&=p{D7K=yW{etp;fogONkV`Ka~sXp?#?Ropm4|C<89-=!H?(+sKkhxO5 z1w*d;hUx~Slv?W8mw3Fi4jT$j{68~ zsQtn(yX)m-7;^=^K9{@cdVkqnd(TJg?_WBAibrAy|3!(NvZ00NBRCXc3S^0(NR0QO zuIxbpc8~$ccjO~g0d9bLhTj7brIbMa(fBVmiGW~R1GE&dktTT z7yDiFn5o{;U8*mb3~U7iLN_3Qj5cK(5S#=Y5RL-pcb*-QgvU;z`YhBF{qUX5hR6$7Rs9XWXXcVF5~r*pggXb+2K zIanUXhr-98lIvA`G`ucw2b&AK(LJ|4Pq4R`r$h!$Q$lS0s4|8C=f6fV21h|8cadR;#4ek zIGCXTk=`Mg2G~&=@C-xRk-8TIfmzB#Ffgo1-T}ENQ!$zKXV<;oeHC4KQCOTN?eY8U zWMb!0=^b`=ncGO}qj&+UaSb+>;SUI8V1NhnBDJB#h}u#5BH&WUO*Xlq;{ufkJm-Kn zh`g6XO4-HL0+pPW7&5<%($m`YQ75Jzr*%yyT0Fw#Ybh?*|gPmm;ZdR?SB^1 z?3%KfVWQ^sk+_!JTT5M~l1{WVw*ZK2Oh~&_Cjs<_SOL}qlENO&s|yGP_v+v}HZ~9$ z22qNrNcso6*bXkcrptv}yO50rrvA4tAA5t^?Ab_qkF@V<~uS&B650v)=_0+M?_oCR2oqxdtO(+MjK(a?RSA0?qt!{5MxPI0j z`N`$46$-=7*JiRfm|5*FC$)`)gcDYJ%P^>=FX}jP#M1!HcpB;}rvM4-e!|Y0qsn9J z0O^R_!g&;NP-Z1S4XOcJC}oG<3Bh5@Ir+(Q?}g{Bm}_ijm(W?8?Wz8q*SGuJ)$&6D zCDBM6X0URQ#1iBz8(i=^ZpM)(N5{zN1lDvUk+2LN4{vEUCfS!%RFn?h<&cgifdiF) zb#FP)W!I3+NrdetTyDjEZ+;lucDWCA`8l4e^{y>+enaQyLt3S8xL0JgB4Hrbchk*!VN2*Cns+27fN4<+or1WO80rQPS5ng1*;&vR8p)!xnX)yi# zQ^!Z4-f32ght9)(fBuD!b@jY%h_nsTaz`|umG z>)l5_{{C-hOEPF>q7G$&r~s{eb0qC3>(%o58Ok8Jp}3yE{f zM2V(hmm>bN7hlqVY+xNqcd}isnkDtjS0{fB-)gQO=VtHo**=}x-+EDgjkq|L(rk^B zv{{}nV{1KQy8&qqumVww6h{DW7!LFmzM=r2O7d(6vfZq32(TF^hO(r8|MU{aR2({U zikc>^((1>uHn;$hLZxynEOZGe1oyL zFw%d+mZTLBPBTuT9p}hFI`EFNQ%Z@U)C%FXmcY@k7M`S#K*@3U(Qa=~n?n7&m%7R2 z>-W^`?3(It8bYZd?Y1|lRs@WeSzwBgN(GITOg2fi@N}o;LRhi0uL~x z*zJK@1hWy21c+wNijG9}BVL2ukjN!x|MU5^s?S#+)74a^x9YXp`AG7>5yIL3Y1_k`|~O!c#U$pCr>n4zK}^ zg>kS1qI|OGRW?4qcD>E|V>IgB53as$JibR?IpHJh=&{F#5H;-n3Bnjo0TAQl*F=&` zR0|{zJ&RIC(vQ5!IN8lO69YFQ-xP?3kkL*GuH#q`g^>c;aJJYuPeSEWwRG|Cep?q1 zMyK9)ZMatV(eG|2l=jWjRVdZ$BSEA=aZBpUR0idQa*IQ9y3BKG1MbYxKHGaR0ykX1 zTG?ds+&6jES%kipz6~6Pn;|72oVd1-6)Kh3p5<-@+P zC>7!VYH39dvMu?4fVy#b2A(bM=O`Ij#*Q(#i+qG;50gBJ4tAVqKnv^i#fmhfwth|B zQa%R?QU|V>e3q*H>+~Zo?loU()@y&)>8`e1JU?2i+V%RH+fviAYegeLy>M-VLYpfP znuB|GhC#f*fH})XtZ}B!AplDn>X?rNO>jv@j)U~K1qSB&}5J+9h z0^w?r?fOW)XJwq<`Q7w3@A}+Wf3&~Km*@8VF{|eK;m8ORyTje;9tvbsY z8-mEQod?jNp5#;)`~$JX$OiPVPB=0if*0Q`OB_8aMZB5lahSm0?164+-}2T zx6*ij-}!?%(q5g^~OwuRLeO%(&vp5%doT0$r86VNOEyS z$DS2u{#-B6(%WJ)lvDwVOi5G4Nv{v-s-(%rEg=d5WC|)XU}c)GOx3!(wdS$O7kBsl zq5{AxsLOI(8jW8qZMJ5?UO3SM;sq|s1ji0ZN}F?BLY;vc4{5~uK!!snaHR~l2k(_F z2}H2g64#_s(&A&cO$COWAev&0v-wiH`gv-W%3~M%(^jutA9a_#{2}-8oSbD0@s>tu zWy`u@#tkVWJuME>BQh66ke*-*MEXpg(|;5yr#ZpRST$14wKEw<5M5;+!oibZWF{>- zPQL;CM1;%wQ4AAR`dNi@Yg6uaw$HJu9oMDj!hFZqP|LA_m9ZBK(m+1s*s9??m`i*{ z_$<*)rdk}na5_SuvNFKn@}I&90D`cUPXBcxxi=(}Kg;PG2&JJCj7hzv`qRz)wq4yu zo9>1ijLps9>i$a0LE-toM#LMJFeyhE)W{T=LyXMXzhDv=a|$zoB2j@LH8p2K|H!x( zZiK@}PM`qSLL4*lIN>CkWzjp%*+E&-F!d`?T|Q5sg zEA8^{zSGOEL%y@Vmsao__!^rLM3jUgmhDkgC+4*04j8vYfJk2xRbP`^3;c5S`vpNHwxohG}&&;2rV>tbPf ztiAYJMs-BB2{{Ji;>HRg%Y|A?ikzlWU^r+akHi}i%4t+Dt|oH-0iO-WTFu7)tLk$P z*h^E3(uhnQ*zei3N#18>zKX5V)jeC&x` z_$tTYVq_Y#^8I&lzqU%lx#^o>bMsI6YndAB$QdA@2XT~TvU;N#t%y#|^_0k^4u(@8 zw6NKPvJ2MB%;2mYe@`{(WROYUapfaAAptNp*Kc9=NEkl6ukEkPCga=8HS*)XnlAs| z=A+)sEzk4Ht2@e@a}x^5&S@rtJtaCpiU1PXu|cMDS5S+7f~ztnJQX)39zqv-Et=Pc zkF1e0Y-EJYX(fBdUa{W}8~4t|#@G1%?B7pEJiFp^gOBd$GG6o-lj+M!T#}}gfC(JL z7dRSIo4SJtX*lA58~{z$l#i?jP)4k0+$}>HVdD~SU=e&R00!!p7dy!~$c@AiKxvw^ zkG`IE?tV9$tI=lJuGQD&)6g_e-D~bsy)>i~8P8b^*f6}rHnQOknKG2u6xJ`~1JhQp zIKY}kmg6?fPr)GoUI?^Q=%q*z#M!Ixa>tGu#k0|ha+M`Nm&f&_Zeeh0bT`9eUC({B z+}*!#@5<&X0+G>5X9VX5j7a6?wv;r)xxmAQEn*UPMomj1nKl=DaeB5U**6KYa}FP| zJ1?VJat0@g7A*7C8{r_1WUKU7FW>y~Id(tWn){2BWeL)IbDHj!jq#U6&>LAc*(=w0 zUu*gdYDZ*-Of`sV)G(kTmtNqmNFhn)O4Z4xJAjDGV;Q%k;K5WD+ZbfvPQw4G(>PtC zg2TP`?_;AsOubax@9%wm__BIhyMI7Be=VhfJ2ta}(lQyfB0<$v8s zwn&LCY)r88=-6fC2ni21>@V^0oJS_yPona5Ts(jcpb8Y5*73D?3Nwb+S0hl8tbAbt z#f{sAusSzE5tPa2X!Bjm?dm!I7ymZ7iT`ogZF+}xnhwf8A_X`>*BQ6_!E#e>lgSxT zYz@~ES%WE=@cEIr6r>ZtwGejV60SaRGXOxH!kdOW?V{a49bR%W7-B5+AoCJbsn`4b zx*eLW?r?nU=Nq~5b$AJFkfp}3-WbX_lGCzG8n^HhcUVvbP{vY@OSm^`;3427ce}aP z#w`i1;c;yPtdHUWq#1|j;tCEF#ak3tRsh^nZE|AC{U21(YNOly%yzxqZCI$+&U0P= zYwFo~v@4$LaUrWM(nf-PjMS8ri$va#Pdm=#2JpCZ2hXJ-t)E3!FTg#zPoh_y*ebz_Y zFzV#&XnvmNt2K99H^=k$VSoE`X+2(xBpfvIqi_yGw#KDM$8|c6K8b!LL;CfN3eE}Ro2{rQzF?g5INQdYW2NKwmf97zDguws$q_C8SR z{kN3A&rWuzzHVxB{~fK0$G>&%J9pdv)&|ljdCP?0Yc6eZ$JQ~=x%H37aH}nls<92h zo;JVCk)RZFvxb9QydTKJWr&Rb@tGV$wE2rmru=49Dhm^uy=9jydWXm7uGB6zr-#$W zuHGCMyk9J>{k2;)k>)@3yP6p6`n{8ZpkIZRy zuGr>6=!%{s&^jf|1O=D9G9}+mi)@eS@AmYsRGF!1`#7GPnj7ZZTeT}rZ|D5DEo6x< z!KbqLD_Y63yYzt+Q3ROnP573xR6gH8ZPBm$v7&J{a;dmAiF Filter -> Project + +2. wait for all before making progress +Scan -> Sum + +3. hash join + 2 children hashbuild and hashjoin + + +taskset --cpu-list 0-3 your_program, taskset --cpu-list 0,1,16,17 your_program + +Velox Join +https://facebookincubator.github.io/velox/develop/joins.html# + +Testing on aws +https://docs.aws.amazon.com/AWSEC2/latest/WindowsGuide/dedicated-hosts-overview.html + + +https://docs.google.com/document/d/1txX60thXn1tQO1ENNT8rwfU3cXLofa7ZccnvP4jD6AA/edit#heading=h.3iwlbn2gzs29 + + + +1. Sum +2. HashJoin +3. Breaking HashJoin in two parts discuss with aditya from scheduler diff --git a/vayu/src/lib.rs b/vayu/src/lib.rs new file mode 100644 index 0000000..94e3907 --- /dev/null +++ b/vayu/src/lib.rs @@ -0,0 +1,79 @@ +mod pipeline_executor; +use arrow::array::RecordBatch; +use arrow::util::pretty; +use pipeline::Pipeline; +use pipeline_executor::PipelineExecutor; +pub mod operators; +pub mod pipeline; +pub mod sinks; + +pub mod store; +use crate::sinks::SchedulerSinkType; +use crate::store::Blob::{HashMapBlob, RecordBatchBlob}; +use crate::store::Store; +use core::panic; +use datafusion::physical_plan::ExecutionPlan; +use std::sync::Arc; +pub struct VayuExecutionEngine { + pub store: Store, +} + +impl VayuExecutionEngine { + pub fn new() -> VayuExecutionEngine { + VayuExecutionEngine { + store: Store::new(), + } + } + + pub fn execute(&mut self, scheduler_pipeline: SchedulerPipeline) { + let plan = scheduler_pipeline.plan; + // convert execution plan to a pipeline + + let pipeline = Pipeline::new(plan, &mut self.store, 1); + // execute the plan to get the results + let mut pipeline_executor = PipelineExecutor::new(pipeline); + let result = pipeline_executor.execute().unwrap(); + + // do the sinking - very simple API + // no need to create a seperate class and introduce indirection unless it moves out of hands + // to call one function we would need 30+ lines otherwise + let sink: SchedulerSinkType = scheduler_pipeline.sink; + match sink { + SchedulerSinkType::PrintOutput => { + pretty::print_batches(&result).unwrap(); + } + SchedulerSinkType::StoreRecordBatch(uuid) => { + self.store.append(uuid, result); + } + SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node) => { + let mut sink = sinks::HashMapSink::new(uuid, join_node); + let map = sink.build_map(result); + self.store.insert(uuid, map.unwrap()); + } + }; + } + pub fn sink(&mut self, uuid: i32) { + let blob = self.store.remove(uuid); + match blob { + Some(blob) => match blob { + RecordBatchBlob(result) => { + pretty::print_batches(&result).unwrap(); + } + HashMapBlob(results) => { + pretty::print_batches(&[results.batch().clone()]).unwrap(); + } + }, + None => panic!("no blob for {uuid} found"), + } + } +} +pub struct SchedulerPipeline { + pub plan: Arc, + pub sink: SchedulerSinkType, +} + +impl SchedulerPipeline { + pub fn new(plan: Arc, sink: SchedulerSinkType) -> SchedulerPipeline { + SchedulerPipeline { plan, sink } + } +} diff --git a/vayu/src/operators/filter.rs b/vayu/src/operators/filter.rs new file mode 100644 index 0000000..8b58cba --- /dev/null +++ b/vayu/src/operators/filter.rs @@ -0,0 +1,27 @@ +use crate::pipeline::{IntermediateOperator, PhysicalOperator}; +use datafusion::arrow::array::RecordBatch; +use datafusion::error::Result; +use datafusion::physical_plan::filter::batch_filter; +use datafusion::physical_plan::PhysicalExpr; + +use std::sync::Arc; +pub struct FilterOperator { + predicate: Arc, +} +impl FilterOperator { + pub fn new(predicate: Arc) -> FilterOperator { + FilterOperator { predicate } + } +} + +impl IntermediateOperator for FilterOperator { + fn execute(&mut self, input: &RecordBatch) -> Result { + batch_filter(input, &self.predicate) + } +} + +impl PhysicalOperator for FilterOperator { + fn name(&self) -> String { + String::from("filter") + } +} diff --git a/vayu/src/operators/join.rs b/vayu/src/operators/join.rs new file mode 100644 index 0000000..0dd24b8 --- /dev/null +++ b/vayu/src/operators/join.rs @@ -0,0 +1,54 @@ +use crate::pipeline::{IntermediateOperator, PhysicalOperator}; +use datafusion::arrow::array::RecordBatch; +use datafusion::error::Result; +use datafusion::physical_plan::joins::hash_join::{ + create_hashes_outer, HashJoinStream, HashJoinStreamState, ProcessProbeBatchState, +}; +use datafusion::physical_plan::joins::utils::StatefulStreamResult; +pub struct HashProbeOperator { + probe: HashJoinStream, +} + +impl HashProbeOperator { + pub fn new(probe: HashJoinStream) -> Self { + Self { probe } + } +} + +impl IntermediateOperator for HashProbeOperator { + fn execute(&mut self, input: &RecordBatch) -> Result { + let mut hashes_buffer: Vec = vec![]; + let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0); + let probe = &mut self.probe; + + // These things are being done in datafusion::fetch_probe_batch + create_hashes_outer( + &input, + probe.on_right.clone(), + &mut hashes_buffer, + &random_state, + )?; + probe.hashes_buffer = hashes_buffer.clone(); + probe.state = HashJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: input.clone(), + offset: (0, None), + joined_probe_idx: None, + }); + + // probe the build side + let output = probe.process_probe_batch().unwrap(); + let o = match output { + StatefulStreamResult::Ready(t) => t.unwrap(), + StatefulStreamResult::Continue => { + panic!("got continue in hash probe") + } + }; + Ok(o) + } +} + +impl PhysicalOperator for HashProbeOperator { + fn name(&self) -> String { + String::from("hash probe") + } +} diff --git a/vayu/src/operators/mod.rs b/vayu/src/operators/mod.rs new file mode 100644 index 0000000..1155748 --- /dev/null +++ b/vayu/src/operators/mod.rs @@ -0,0 +1,4 @@ +pub mod filter; +pub mod join; +pub mod projection; +pub mod scan; diff --git a/vayu/src/operators/projection.rs b/vayu/src/operators/projection.rs new file mode 100644 index 0000000..8d29f42 --- /dev/null +++ b/vayu/src/operators/projection.rs @@ -0,0 +1,28 @@ +use crate::pipeline::{IntermediateOperator, PhysicalOperator}; +use arrow::datatypes::SchemaRef; +use datafusion::arrow::array::RecordBatch; +use datafusion::error::Result; +use datafusion::physical_plan::projection::batch_project; +use datafusion::physical_plan::PhysicalExpr; +use std::sync::Arc; +pub struct ProjectionOperator { + expr: Vec>, + schema: SchemaRef, +} +impl ProjectionOperator { + pub fn new(expr: Vec>, schema: SchemaRef) -> ProjectionOperator { + ProjectionOperator { expr, schema } + } +} + +impl IntermediateOperator for ProjectionOperator { + fn execute(&mut self, input: &RecordBatch) -> Result { + batch_project(input, self.expr.clone(), self.schema.clone()) + } +} + +impl PhysicalOperator for ProjectionOperator { + fn name(&self) -> String { + String::from("projection") + } +} diff --git a/vayu/src/operators/scan.rs b/vayu/src/operators/scan.rs new file mode 100644 index 0000000..14eff69 --- /dev/null +++ b/vayu/src/operators/scan.rs @@ -0,0 +1,36 @@ +use crate::pipeline::{PhysicalOperator, Source}; +use datafusion::arrow::array::RecordBatch; +use datafusion::physical_plan::SendableRecordBatchStream; +use futures::stream::StreamExt; +use tokio::task; +pub struct ScanOperator { + stream: SendableRecordBatchStream, +} +impl ScanOperator { + pub fn new(stream: SendableRecordBatchStream) -> ScanOperator { + ScanOperator { stream } + } +} +// right now scan operator is blocking by design +// i don't wish to pass the execution access to tokio runtime. +// eventually we would have the main process call async functions to get data and then this data would +// be passed to the worker who would process the data parallely without any runtime. +// basically scan operator would not be called on worker thread which are only for CPU tasks +// zero context switches let's goooo!!!!1 +impl Source for ScanOperator { + fn get_data(&mut self) -> Option { + let block = task::block_in_place(|| { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(self.stream.next()) + }); + let t = block.transpose(); + t.unwrap() + } +} + +impl PhysicalOperator for ScanOperator { + fn name(&self) -> String { + String::from("scan") + } +} diff --git a/vayu/src/pipeline.rs b/vayu/src/pipeline.rs new file mode 100644 index 0000000..468112b --- /dev/null +++ b/vayu/src/pipeline.rs @@ -0,0 +1,114 @@ +use crate::operators::filter::FilterOperator; +use crate::operators::join::HashProbeOperator; +use crate::operators::projection::ProjectionOperator; +use crate::store::Store; +use arrow::array::BooleanBufferBuilder; +use core::panic; +use datafusion::arrow::array::RecordBatch; +use datafusion::datasource::physical_plan::CsvExec; +use datafusion::error::Result; +use datafusion::execution::context::SessionContext; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::joins::hash_join::BuildSide; +use datafusion::physical_plan::joins::hash_join::BuildSideReadyState; +use datafusion::physical_plan::joins::HashJoinExec; +use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionConfig; +use std::sync::Arc; +pub struct Pipeline { + pub source: Option, + pub operators: Vec>, + pub state: PipelineState, +} +pub struct PipelineState { + uuid: i32, +} +impl Pipeline { + pub fn new(plan: Arc, store: &mut Store, uuid: i32) -> Pipeline { + let mut pipeline = Pipeline { + source: None, + operators: vec![], + state: PipelineState { uuid }, + }; + make_pipeline(&mut pipeline, plan, store); + pipeline + } +} +fn make_pipeline(pipeline: &mut Pipeline, plan: Arc, store: &mut Store) { + let p = plan.as_any(); + let config = SessionConfig::new().with_batch_size(32); + let ctx = Arc::new(SessionContext::new_with_config(config)); + let context = ctx.task_ctx(); + + // set batch size here + // println!("batch size {context.se}"); + if let Some(_) = p.downcast_ref::() { + let stream = plan.execute(0, context).unwrap(); + println!("adding source"); + pipeline.source = Some(stream); + return; + } + if let Some(exec) = p.downcast_ref::() { + make_pipeline(pipeline, exec.input().clone(), store); + let tt = Box::new(FilterOperator::new(exec.predicate().clone())); + println!("adding filter"); + + pipeline.operators.push(tt); + return; + } + if let Some(exec) = p.downcast_ref::() { + make_pipeline(pipeline, exec.input().clone(), store); + println!("adding projection"); + let expr = exec.expr().iter().map(|x| x.0.clone()).collect(); + let schema = exec.schema().clone(); + let tt = Box::new(ProjectionOperator::new(expr, schema)); + pipeline.operators.push(tt); + return; + } + if let Some(exec) = p.downcast_ref::() { + // this function will only be called for probe side + // build side wont have hashjoinexec in make_pipeline call + make_pipeline(pipeline, exec.right().clone(), store); + println!("adding hashprobe"); + let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap(); + // using uuid but this value would be present in HashProbeExec itself + let build_map = store.remove(pipeline.state.uuid).unwrap(); + let left_data = Arc::new(build_map.get_map()); + let visited_left_side = BooleanBufferBuilder::new(0); + hashjoinstream.build_side = BuildSide::Ready(BuildSideReadyState { + left_data, + visited_left_side, + }); + let tt = Box::new(HashProbeOperator::new(hashjoinstream)); + pipeline.operators.push(tt); + return; + } + if let Some(exec) = p.downcast_ref::() { + make_pipeline(pipeline, exec.input().clone(), store); + return; + } + if let Some(exec) = p.downcast_ref::() { + make_pipeline(pipeline, exec.input().clone(), store); + return; + } + panic!("should never reach the end"); +} +pub trait PhysicalOperator { + fn name(&self) -> String; +} + +//Operators that implement Source trait emit data +pub trait Source: PhysicalOperator { + fn get_data(&mut self) -> Option; +} + +//Physical operators that implement the Operator trait process data +pub trait IntermediateOperator: PhysicalOperator { + //takes an input chunk and outputs another chunk + //for example in Projection Operator we appply the expression to the input chunk and produce the output chunk + fn execute(&mut self, input: &RecordBatch) -> Result; +} diff --git a/vayu/src/pipeline_executor.rs b/vayu/src/pipeline_executor.rs new file mode 100644 index 0000000..c19e6be --- /dev/null +++ b/vayu/src/pipeline_executor.rs @@ -0,0 +1,54 @@ +use crate::pipeline::IntermediateOperator; +use crate::pipeline::Pipeline; +use arrow::array::RecordBatch; +use arrow::error::Result; +use futures::StreamExt; +pub struct PipelineExecutor { + pipeline: Pipeline, +} + +impl PipelineExecutor { + pub fn new(pipeline: Pipeline) -> Self { + PipelineExecutor { pipeline } + } + pub fn execute(&mut self) -> Result> { + let mut results: Vec = vec![]; + if self.pipeline.source.is_none() { + panic!("no source"); + } + let source = self.pipeline.source.as_mut().unwrap(); + + println!("source is present"); + loop { + // read from source until finished. + let data = futures::executor::block_on(source.next()); + if data.is_none() { + break; + } + let data = data.unwrap().unwrap(); + let output = Self::execute_push_internal(&mut self.pipeline.operators, data); + results.push(output) + } + Ok(results) + } + /** + * takes a record batch and passes it through all the operators + * and returns the final record batch. synchronous code. faster. + * no operator can be blocked (for now). + */ + fn execute_push_internal( + operators: &mut Vec>, + mut data: RecordBatch, + ) -> RecordBatch { + for x in operators { + println!( + "running operator {} size {}x{}", + x.name(), + data.num_rows(), + data.num_columns() + ); + data = x.execute(&data).unwrap(); + } + data + } +} diff --git a/vayu/src/sinks.rs b/vayu/src/sinks.rs new file mode 100644 index 0000000..a71a78f --- /dev/null +++ b/vayu/src/sinks.rs @@ -0,0 +1,56 @@ +use datafusion::physical_plan::joins::hash_join; +use datafusion::physical_plan::ExecutionPlan; + +use crate::store::Blob; +use crate::RecordBatch; +use ahash::RandomState; +use arrow::datatypes::Schema; +use datafusion::execution::memory_pool::MemoryConsumer; +use datafusion::physical_expr::PhysicalExprRef; +use datafusion::physical_plan::joins::HashJoinExec; +use datafusion::prelude::SessionContext; +use std::sync::Arc; +pub struct HashMapSink { + pub on_left: Vec, + pub schema: Arc, + pub uuid: i32, +} + +impl HashMapSink { + pub fn new(uuid: i32, plan: Arc) -> Self { + let p = plan.as_any(); + let exec = p.downcast_ref::(); + if exec.is_none() { + panic!("not a join node"); + } + let exec = exec.unwrap(); + let on_left = exec.on().iter().map(|on| on.0.clone()).collect::>(); + HashMapSink { + on_left, + schema: exec.left().schema(), + uuid, + } + } + pub fn build_map(&mut self, result: Vec) -> Option { + let random_state = RandomState::with_seeds(0, 0, 0, 0); + let ctx: SessionContext = SessionContext::new(); + let reservation = + MemoryConsumer::new("HashJoinInput").register(ctx.task_ctx().memory_pool()); + + let hash_map = hash_join::create_hash_build_map( + result, + random_state, + self.on_left.clone(), + self.schema.clone(), + reservation, + ) + .unwrap(); + Some(Blob::HashMapBlob(hash_map)) + } +} + +pub enum SchedulerSinkType { + StoreRecordBatch(i32), + BuildAndStoreHashMap(i32, Arc), + PrintOutput, +} diff --git a/vayu/src/store.rs b/vayu/src/store.rs new file mode 100644 index 0000000..6cd8e08 --- /dev/null +++ b/vayu/src/store.rs @@ -0,0 +1,62 @@ +use arrow::array::RecordBatch; +use datafusion::physical_plan::joins::hash_join::JoinLeftData; + +use core::panic; +use std::collections::HashMap; +pub enum Blob { + RecordBatchBlob(Vec), + HashMapBlob(JoinLeftData), +} + +impl Blob { + pub fn get_map(self) -> JoinLeftData { + match self { + Blob::HashMapBlob(m) => m, + _ => panic!("error"), + } + } + pub fn get_records(self) -> Vec { + match self { + Blob::RecordBatchBlob(records) => records, + _ => panic!("error"), + } + } + pub fn append_records(&mut self, batches: Vec) { + match self { + Blob::RecordBatchBlob(records) => { + // TODO: check if schema is same + records.extend(batches) + } + _ => panic!("error"), + } + } +} + +// right now this is typedef of HashMap, +// but we may need something else in near future + +pub struct Store { + store: HashMap, +} +impl Store { + pub fn new() -> Store { + Store { + store: HashMap::new(), + } + } + pub fn insert(&mut self, key: i32, value: Blob) { + self.store.insert(key, value); + } + pub fn append(&mut self, key: i32, value: Vec) { + let blob = self.remove(key); + let mut blob = match blob { + Some(r) => r, + None => Blob::RecordBatchBlob(Vec::new()), + }; + blob.append_records(value); + self.store.insert(key, blob); + } + pub fn remove(&mut self, key: i32) -> Option { + self.store.remove(&key) + } +}