diff --git a/Cargo.toml b/Cargo.toml index a9e9556fc..2a3e66323 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,10 +26,10 @@ arrow-flight = { version = "53", features = ["flight-sql-experimental"] } clap = { version = "4.5", features = ["derive", "cargo"] } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } -datafusion = "43.0.0" -datafusion-cli = "43.0.0" -datafusion-proto = "43.0.0" -datafusion-proto-common = "43.0.0" +datafusion = "44.0.0" +datafusion-cli = "44.0.0" +datafusion-proto = "44.0.0" +datafusion-proto-common = "44.0.0" object_store = "0.11" prost = "0.13" prost-types = "0.13" diff --git a/ballista/client/tests/context_checks.rs b/ballista/client/tests/context_checks.rs index d3f8fc930..a46f93803 100644 --- a/ballista/client/tests/context_checks.rs +++ b/ballista/client/tests/context_checks.rs @@ -198,6 +198,8 @@ mod supported { "| datafusion | information_schema | columns | VIEW |", "| datafusion | information_schema | df_settings | VIEW |", "| datafusion | information_schema | schemata | VIEW |", + "| datafusion | information_schema | routines | VIEW |", + "| datafusion | information_schema | parameters | VIEW |", "+---------------+--------------------+-------------+------------+", ]; diff --git a/ballista/core/proto/datafusion.proto b/ballista/core/proto/datafusion.proto index 5a044cb43..e27f8af21 100644 --- a/ballista/core/proto/datafusion.proto +++ b/ballista/core/proto/datafusion.proto @@ -59,6 +59,8 @@ message LogicalPlanNode { DistinctOnNode distinct_on = 28; CopyToNode copy_to = 29; UnnestNode unnest = 30; + RecursiveQueryNode recursive_query = 31; + CteWorkTableScanNode cte_work_table_scan = 32; } } @@ -75,6 +77,10 @@ message LogicalExprNodeCollection { repeated LogicalExprNode logical_expr_nodes = 1; } +message SortExprNodeCollection { + repeated SortExprNode sort_expr_nodes = 1; +} + message ListingTableScanNode { reserved 1; // was string table_name TableReference table_name = 14; @@ -90,8 +96,9 @@ message ListingTableScanNode { datafusion_common.CsvFormat csv = 10; datafusion_common.ParquetFormat parquet = 11; datafusion_common.AvroFormat avro = 12; + datafusion_common.NdJsonFormat json = 15; } - repeated LogicalExprNodeCollection file_sort_order = 13; + repeated SortExprNodeCollection file_sort_order = 13; } message ViewTableScanNode { @@ -128,7 +135,7 @@ message SelectionNode { message SortNode { LogicalPlanNode input = 1; - repeated LogicalExprNode expr = 2; + repeated SortExprNode expr = 2; // Maximum number of highest/lowest rows to fetch; negative means no limit int64 fetch = 3; } @@ -158,13 +165,14 @@ message CreateExternalTableNode { datafusion_common.DfSchema schema = 4; repeated string table_partition_cols = 5; bool if_not_exists = 6; + bool temporary = 14; string definition = 7; - repeated LogicalExprNodeCollection order_exprs = 10; + repeated SortExprNodeCollection order_exprs = 10; bool unbounded = 11; map options = 8; datafusion_common.Constraints constraints = 12; map column_defaults = 13; - } +} message PrepareNode { string name = 1; @@ -195,6 +203,7 @@ message CreateViewNode { TableReference name = 5; LogicalPlanNode input = 2; bool or_replace = 3; + bool temporary = 6; string definition = 4; } @@ -244,35 +253,49 @@ message DistinctNode { message DistinctOnNode { repeated LogicalExprNode on_expr = 1; repeated LogicalExprNode select_expr = 2; - repeated LogicalExprNode sort_expr = 3; + repeated SortExprNode sort_expr = 3; LogicalPlanNode input = 4; } message CopyToNode { - LogicalPlanNode input = 1; - string output_url = 2; - oneof format_options { - datafusion_common.CsvOptions csv = 8; - datafusion_common.JsonOptions json = 9; - datafusion_common.TableParquetOptions parquet = 10; - datafusion_common.AvroOptions avro = 11; - datafusion_common.ArrowOptions arrow = 12; - } - repeated string partition_by = 7; + LogicalPlanNode input = 1; + string output_url = 2; + bytes file_type = 3; + repeated string partition_by = 7; } message UnnestNode { - LogicalPlanNode input = 1; - repeated datafusion_common.Column exec_columns = 2; - repeated uint64 list_type_columns = 3; - repeated uint64 struct_type_columns = 4; - repeated uint64 dependency_indices = 5; - datafusion_common.DfSchema schema = 6; - UnnestOptions options = 7; + LogicalPlanNode input = 1; + repeated datafusion_common.Column exec_columns = 2; + repeated ColumnUnnestListItem list_type_columns = 3; + repeated uint64 struct_type_columns = 4; + repeated uint64 dependency_indices = 5; + datafusion_common.DfSchema schema = 6; + UnnestOptions options = 7; +} +message ColumnUnnestListItem { + uint32 input_index = 1; + ColumnUnnestListRecursion recursion = 2; +} + +message ColumnUnnestListRecursions { + repeated ColumnUnnestListRecursion recursions = 2; +} + +message ColumnUnnestListRecursion { + datafusion_common.Column output_column = 1; + uint32 depth = 2; } message UnnestOptions { - bool preserve_nulls = 1; + bool preserve_nulls = 1; + repeated RecursionUnnestOption recursions = 2; +} + +message RecursionUnnestOption { + datafusion_common.Column output_column = 1; + datafusion_common.Column input_column = 2; + uint32 depth = 3; } message UnionNode { @@ -316,8 +339,6 @@ message LogicalExprNode { // binary expressions BinaryExprNode binary_expr = 4; - // aggregate expressions - AggregateExprNode aggregate_expr = 5; // null checks IsNull is_null_expr = 6; @@ -327,7 +348,6 @@ message LogicalExprNode { BetweenNode between = 9; CaseNode case_ = 10; CastNode cast = 11; - SortExprNode sort = 12; NegativeNode negative = 13; InListNode in_list = 14; Wildcard wildcard = 15; @@ -369,7 +389,7 @@ message LogicalExprNode { } message Wildcard { - string qualifier = 1; + TableReference qualifier = 1; } message PlaceholderNode { @@ -471,57 +491,14 @@ message InListNode { bool negated = 3; } -enum AggregateFunction { - MIN = 0; - MAX = 1; - SUM = 2; - AVG = 3; - COUNT = 4; - APPROX_DISTINCT = 5; - ARRAY_AGG = 6; - // VARIANCE = 7; - VARIANCE_POP = 8; - // COVARIANCE = 9; - // COVARIANCE_POP = 10; - STDDEV = 11; - STDDEV_POP = 12; - CORRELATION = 13; - APPROX_PERCENTILE_CONT = 14; - APPROX_MEDIAN = 15; - APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16; - GROUPING = 17; - // MEDIAN = 18; - BIT_AND = 19; - BIT_OR = 20; - BIT_XOR = 21; - BOOL_AND = 22; - BOOL_OR = 23; - REGR_SLOPE = 26; - REGR_INTERCEPT = 27; - REGR_COUNT = 28; - REGR_R2 = 29; - REGR_AVGX = 30; - REGR_AVGY = 31; - REGR_SXX = 32; - REGR_SYY = 33; - REGR_SXY = 34; - STRING_AGG = 35; - NTH_VALUE_AGG = 36; -} - -message AggregateExprNode { - AggregateFunction aggr_function = 1; - repeated LogicalExprNode expr = 2; - bool distinct = 3; - LogicalExprNode filter = 4; - repeated LogicalExprNode order_by = 5; -} message AggregateUDFExprNode { string fun_name = 1; repeated LogicalExprNode args = 2; + bool distinct = 5; LogicalExprNode filter = 3; - repeated LogicalExprNode order_by = 4; + repeated SortExprNode order_by = 4; + optional bytes fun_definition = 6; } message ScalarUDFExprNode { @@ -530,32 +507,18 @@ message ScalarUDFExprNode { optional bytes fun_definition = 3; } -enum BuiltInWindowFunction { - ROW_NUMBER = 0; - RANK = 1; - DENSE_RANK = 2; - PERCENT_RANK = 3; - CUME_DIST = 4; - NTILE = 5; - LAG = 6; - LEAD = 7; - FIRST_VALUE = 8; - LAST_VALUE = 9; - NTH_VALUE = 10; -} - message WindowExprNode { oneof window_function { - AggregateFunction aggr_function = 1; - BuiltInWindowFunction built_in_function = 2; + // BuiltInWindowFunction built_in_function = 2; string udaf = 3; string udwf = 9; } - LogicalExprNode expr = 4; + repeated LogicalExprNode exprs = 4; repeated LogicalExprNode partition_by = 5; - repeated LogicalExprNode order_by = 6; + repeated SortExprNode order_by = 6; // repeated LogicalExprNode filter = 7; WindowFrame window_frame = 8; + optional bytes fun_definition = 10; } message BetweenNode { @@ -674,9 +637,12 @@ message PlanType { datafusion_common.EmptyMessage FinalLogicalPlan = 3; datafusion_common.EmptyMessage InitialPhysicalPlan = 4; datafusion_common.EmptyMessage InitialPhysicalPlanWithStats = 9; + datafusion_common.EmptyMessage InitialPhysicalPlanWithSchema = 11; OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5; datafusion_common.EmptyMessage FinalPhysicalPlan = 6; datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10; + datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12; + datafusion_common.EmptyMessage PhysicalPlanError = 13; } } @@ -737,10 +703,11 @@ message PhysicalPlanNode { AnalyzeExecNode analyze = 23; JsonSinkExecNode json_sink = 24; SymmetricHashJoinExecNode symmetric_hash_join = 25; - InterleaveExecNode interleave = 26; + InterleaveExecNode interleave = 26; PlaceholderRowExecNode placeholder_row = 27; CsvSinkExecNode csv_sink = 28; ParquetSinkExecNode parquet_sink = 29; + UnnestExecNode unnest = 30; } } @@ -752,13 +719,21 @@ message PartitionColumn { message FileSinkConfig { reserved 6; // writer_mode + reserved 8; // was `overwrite` which has been superseded by `insert_op` string object_store_url = 1; repeated PartitionedFile file_groups = 2; repeated string table_paths = 3; datafusion_common.Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; - bool overwrite = 8; + bool keep_partition_by_columns = 9; + InsertOp insert_op = 10; +} + +enum InsertOp { + Append = 0; + Overwrite = 1; + Replace = 2; } message JsonSink { @@ -797,6 +772,19 @@ message ParquetSinkExecNode { PhysicalSortExprNodeCollection sort_order = 4; } +message UnnestExecNode { + PhysicalPlanNode input = 1; + datafusion_common.Schema schema = 2; + repeated ListUnnest list_type_columns = 3; + repeated uint64 struct_type_columns = 4; + UnnestOptions options = 5; +} + +message ListUnnest { + uint32 index_in_input_schema = 1; + uint32 depth = 2; +} + message PhysicalExtensionNode { bytes node = 1; repeated PhysicalPlanNode inputs = 2; @@ -838,6 +826,10 @@ message PhysicalExprNode { // was PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17; PhysicalLikeExprNode like_expr = 18; + + PhysicalExtensionExprNode extension = 19; + + UnknownColumn unknown_column = 20; } } @@ -846,29 +838,32 @@ message PhysicalScalarUdfNode { repeated PhysicalExprNode args = 2; optional bytes fun_definition = 3; datafusion_common.ArrowType return_type = 4; + bool nullable = 5; } message PhysicalAggregateExprNode { oneof AggregateFunction { - AggregateFunction aggr_function = 1; string user_defined_aggr_function = 4; } repeated PhysicalExprNode expr = 2; repeated PhysicalSortExprNode ordering_req = 5; bool distinct = 3; + bool ignore_nulls = 6; + optional bytes fun_definition = 7; } message PhysicalWindowExprNode { oneof window_function { - AggregateFunction aggr_function = 1; - BuiltInWindowFunction built_in_function = 2; + // BuiltInWindowFunction built_in_function = 2; string user_defined_aggr_function = 3; + string user_defined_window_function = 10; } repeated PhysicalExprNode args = 4; repeated PhysicalExprNode partition_by = 5; repeated PhysicalSortExprNode order_by = 6; WindowFrame window_frame = 7; string name = 8; + optional bytes fun_definition = 9; } message PhysicalIsNull { @@ -944,10 +939,16 @@ message PhysicalNegativeNode { PhysicalExprNode expr = 1; } +message PhysicalExtensionExprNode { + bytes expr = 1; + repeated PhysicalExprNode inputs = 2; +} + message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; uint32 default_filter_selectivity = 3; + repeated uint32 projection = 9; } message FileGroup { @@ -994,6 +995,10 @@ message CsvScanExecNode { oneof optional_escape { string escape = 5; } + oneof optional_comment { + string comment = 6; + } + bool newlines_in_values = 7; } message AvroScanExecNode { @@ -1065,6 +1070,10 @@ message PhysicalColumn { uint32 index = 2; } +message UnknownColumn { + string name = 1; +} + message JoinOn { PhysicalExprNode left = 1; PhysicalExprNode right = 2; @@ -1174,6 +1183,7 @@ message NestedLoopJoinExecNode { message CoalesceBatchesExecNode { PhysicalPlanNode input = 1; uint32 target_batch_size = 2; + optional uint32 fetch = 3; } message CoalescePartitionsExecNode { @@ -1233,4 +1243,16 @@ message PartitionStats { int64 num_batches = 2; int64 num_bytes = 3; repeated datafusion_common.ColumnStats column_stats = 4; +} + +message RecursiveQueryNode { + string name = 1; + LogicalPlanNode static_term = 2; + LogicalPlanNode recursive_term = 3; + bool is_distinct = 4; +} + +message CteWorkTableScanNode { + string name = 1; + datafusion_common.Schema schema = 2; } \ No newline at end of file diff --git a/ballista/core/proto/datafusion_common.proto b/ballista/core/proto/datafusion_common.proto index 94490ec24..ec089e43d 100644 --- a/ballista/core/proto/datafusion_common.proto +++ b/ballista/core/proto/datafusion_common.proto @@ -421,10 +421,11 @@ message CsvOptions { string timestamp_tz_format = 10; // Optional timestamp with timezone format string time_format = 11; // Optional time format string null_value = 12; // Optional representation of null value - bytes comment = 13; // Optional comment character as a byte - bytes double_quote = 14; // Indicates if quotes are doubled - bytes newlines_in_values = 15; // Indicates if newlines are supported in values - bytes terminator = 16; // Optional terminator character as a byte + string null_regex = 13; // Optional representation of null loading regex + bytes comment = 14; // Optional comment character as a byte + bytes double_quote = 15; // Indicates if quotes are doubled + bytes newlines_in_values = 16; // Indicates if newlines are supported in values + bytes terminator = 17; // Optional terminator character as a byte } // Options controlling CSV format diff --git a/ballista/core/src/execution_plans/distributed_query.rs b/ballista/core/src/execution_plans/distributed_query.rs index 785d3b0cb..39e60d9ce 100644 --- a/ballista/core/src/execution_plans/distributed_query.rs +++ b/ballista/core/src/execution_plans/distributed_query.rs @@ -32,8 +32,8 @@ use datafusion::logical_expr::LogicalPlan; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, }; use datafusion_proto::logical_plan::{ AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec, @@ -129,7 +129,8 @@ impl DistributedQueryExec { PlanProperties::new( EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, ) } } diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index f50d6a291..7a20f1215 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -79,7 +79,8 @@ impl ShuffleReaderExec { let properties = PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(schema.clone()), partitioning, - datafusion::physical_plan::ExecutionMode::Bounded, + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, ); Ok(Self { stage_id, diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 9077fd9d6..23b437f68 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -135,7 +135,8 @@ impl ShuffleWriterExec { let properties = PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(plan.schema()), partitioning, - datafusion::physical_plan::ExecutionMode::Bounded, + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, ); Ok(Self { job_id, diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs b/ballista/core/src/execution_plans/unresolved_shuffle.rs index 9d4d3077d..75a168e0c 100644 --- a/ballista/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs @@ -50,7 +50,8 @@ impl UnresolvedShuffleExec { let properties = PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(schema.clone()), partitioning, - datafusion::physical_plan::ExecutionMode::Bounded, + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, ); Self { stage_id, diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index 25bdbad92..41d160299 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -23,7 +23,7 @@ use crate::serde::protobuf::KeyValuePair; use crate::serde::{BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec}; use crate::utils::BallistaQueryPlanner; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; @@ -132,8 +132,7 @@ impl SessionStateExt for SessionState { // Ballista disables this option .with_round_robin_repartition(false); - let runtime_config = RuntimeConfig::default(); - let runtime_env = RuntimeEnv::try_new(runtime_config)?; + let runtime_env = RuntimeEnvBuilder::new().build()?; let session_state = SessionStateBuilder::new() .with_default_features() .with_config(session_config) diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 913e955d3..a0dbc451c 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -32,7 +32,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor}; use datafusion::error::DataFusionError; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::{DdlStatement, LogicalPlan, TableScan}; use datafusion::physical_plan::empty::EmptyExec; @@ -56,7 +56,7 @@ pub fn default_session_builder( Ok(SessionStateBuilder::new() .with_default_features() .with_config(config) - .with_runtime_env(Arc::new(RuntimeEnv::try_new(RuntimeConfig::default())?)) + .with_runtime_env(Arc::new(RuntimeEnvBuilder::new().build()?)) .build()) } @@ -317,17 +317,14 @@ mod test { use datafusion::{ common::tree_node::TreeNode, error::Result, - execution::{ - runtime_env::{RuntimeConfig, RuntimeEnv}, - SessionStateBuilder, - }, + execution::{runtime_env::RuntimeEnvBuilder, SessionStateBuilder}, prelude::{SessionConfig, SessionContext}, }; use crate::utils::LocalRun; fn context() -> SessionContext { - let runtime_environment = RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(); + let runtime_environment = RuntimeEnvBuilder::new().build().unwrap(); let session_config = SessionConfig::new().with_information_schema(true); diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs index 1d77e7198..3d7d89f90 100644 --- a/ballista/executor/src/collect.rs +++ b/ballista/executor/src/collect.rs @@ -46,7 +46,8 @@ impl CollectExec { let properties = PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(plan.schema()), Partitioning::UnknownPartitioning(1), - datafusion::physical_plan::ExecutionMode::Bounded, + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, ); Self { plan, properties } } diff --git a/ballista/executor/src/executor.rs b/ballista/executor/src/executor.rs index 1b029e171..3eb762d75 100644 --- a/ballista/executor/src/executor.rs +++ b/ballista/executor/src/executor.rs @@ -272,7 +272,8 @@ mod test { Schema::empty(), )), Partitioning::UnknownPartitioning(1), - datafusion::physical_plan::ExecutionMode::Bounded, + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, ), } } diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index fac02b48d..bfd76b059 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -37,7 +37,7 @@ use tokio::task::JoinHandle; use tokio::{fs, time}; use uuid::Uuid; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy}; use ballista_core::error::BallistaError; @@ -203,8 +203,10 @@ pub async fn start_executor_process( let runtime_producer: RuntimeProducer = opt.override_runtime_producer.clone().unwrap_or_else(|| { Arc::new(move |_| { - let config = RuntimeConfig::new().with_temp_file_path(wd.clone()); - Ok(Arc::new(RuntimeEnv::try_new(config)?)) + let runtime_env = RuntimeEnvBuilder::new() + .with_temp_file_path(wd.clone()) + .build()?; + Ok(Arc::new(runtime_env)) }) }); diff --git a/ballista/executor/src/standalone.rs b/ballista/executor/src/standalone.rs index 57082fc2c..b16e4a3a0 100644 --- a/ballista/executor/src/standalone.rs +++ b/ballista/executor/src/standalone.rs @@ -31,7 +31,7 @@ use ballista_core::{ BALLISTA_VERSION, }; use ballista_core::{ConfigProducer, RuntimeProducer}; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::SessionState; use datafusion::prelude::SessionConfig; use datafusion_proto::logical_plan::AsLogicalPlan; @@ -184,8 +184,10 @@ pub async fn new_standalone_executor< let config_producer = Arc::new(default_config_producer); let wd = work_dir.clone(); let runtime_producer: RuntimeProducer = Arc::new(move |_: &SessionConfig| { - let config = RuntimeConfig::new().with_temp_file_path(wd.clone()); - Ok(Arc::new(RuntimeEnv::try_new(config)?)) + let runtime_env = RuntimeEnvBuilder::new() + .with_temp_file_path(wd.clone()) + .build()?; + Ok(Arc::new(runtime_env)) }); let executor = Arc::new(Executor::new_basic( diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index c54b0ceae..5be42ee79 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -1030,6 +1030,7 @@ mod test { range: None, extensions: None, statistics: None, + metadata_size_hint: None, }]); } vec![scan_files] diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index fc32262e0..a01267091 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -687,7 +687,7 @@ order by assert_eq!(Some(&Column::new("l_shipmode", 1)), partition_by); assert_eq!(InputOrderMode::Sorted, window.input_order_mode); let sort = downcast_exec!(window.children()[0], SortExec); - match sort.expr() { + match &sort.expr().inner[..] { [expr1, expr2] => { assert_eq!( SortOptions { diff --git a/examples/src/object_store.rs b/examples/src/object_store.rs index 5b5e38a6a..0b2b9e159 100644 --- a/examples/src/object_store.rs +++ b/examples/src/object_store.rs @@ -30,14 +30,12 @@ use datafusion::config::{ }; use datafusion::error::Result; use datafusion::execution::object_store::ObjectStoreRegistry; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::SessionState; use datafusion::prelude::SessionConfig; use datafusion::{ error::DataFusionError, - execution::{ - runtime_env::{RuntimeConfig, RuntimeEnv}, - SessionStateBuilder, - }, + execution::{runtime_env::RuntimeEnv, SessionStateBuilder}, }; use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; @@ -75,11 +73,13 @@ pub fn custom_runtime_env_with_s3_support( "S3 Options not set".to_string(), ))?; - let config = RuntimeConfig::new().with_object_store_registry(Arc::new( - CustomObjectStoreRegistry::new(s3options.clone()), - )); + let runtime_env = RuntimeEnvBuilder::new() + .with_object_store_registry(Arc::new(CustomObjectStoreRegistry::new( + s3options.clone(), + ))) + .build()?; - Ok(Arc::new(RuntimeEnv::try_new(config)?)) + Ok(Arc::new(runtime_env)) } /// Custom [SessionState] constructor method