diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index 36e68ec4842b..4400d32a8b50 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -244,6 +244,7 @@ impl TableProvider for ParquetMetadataTable { Ok(Arc::new(MemoryExec::try_new( &[vec![self.batch.clone()]], TableProvider::schema(self), + None, projection.cloned(), )?)) } diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/remote_catalog.rs index 206b7ba9c4be..ebab27486622 100644 --- a/datafusion-examples/examples/remote_catalog.rs +++ b/datafusion-examples/examples/remote_catalog.rs @@ -339,6 +339,7 @@ impl TableProvider for RemoteTable { Ok(Arc::new(MemoryExec::try_new( &[batches], self.schema.clone(), + None, projection.cloned(), )?)) } diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index 7cf1ce87690e..936dcde28e42 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -123,6 +123,7 @@ impl TableProvider for LocalCsvTable { Ok(Arc::new(MemoryExec::try_new( &[batches], TableProvider::schema(self), + None, projection.cloned(), )?)) } diff --git a/datafusion/core/benches/physical_plan.rs b/datafusion/core/benches/physical_plan.rs index 7d87a37b3b9c..983989d69bcd 100644 --- a/datafusion/core/benches/physical_plan.rs +++ b/datafusion/core/benches/physical_plan.rs @@ -59,6 +59,7 @@ fn sort_preserving_merge_operator( &batches.into_iter().map(|rb| vec![rb]).collect::>(), schema, None, + None, ) .unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 14e80ce364e3..e2f1fc955d13 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -167,7 +167,7 @@ impl BenchCase { let schema = partitions[0][0].schema(); let sort = make_sort_exprs(schema.as_ref()); - let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap(); let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); Self { @@ -186,7 +186,7 @@ impl BenchCase { let schema = partitions[0][0].schema(); let sort = make_sort_exprs(schema.as_ref()); - let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap(); let exec = SortExec::new(sort.clone(), Arc::new(exec)).with_preserve_partitioning(true); let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); @@ -208,7 +208,7 @@ impl BenchCase { let schema = partitions[0][0].schema(); let sort = make_sort_exprs(schema.as_ref()); - let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap(); let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); let plan = Arc::new(SortExec::new(sort, exec)); @@ -229,7 +229,7 @@ impl BenchCase { let schema = partitions[0][0].schema(); let sort = make_sort_exprs(schema.as_ref()); - let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap(); let exec = SortExec::new(sort, Arc::new(exec)).with_preserve_partitioning(true); let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec))); diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index c1e0bea0b3ff..d984230c1035 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -132,6 +132,7 @@ impl MemTable { state: &SessionState, ) -> Result { let schema = t.schema(); + let constraints = t.constraints(); let exec = t.scan(state, None, &[], None).await?; let partition_count = exec.output_partitioning().partition_count(); @@ -162,7 +163,8 @@ impl MemTable { } } - let exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?; + let exec = + MemoryExec::try_new(&data, Arc::clone(&schema), constraints.cloned(), None)?; if let Some(num_partitions) = output_partitions { let exec = RepartitionExec::try_new( @@ -220,8 +222,12 @@ impl TableProvider for MemTable { partitions.push(inner_vec.clone()) } - let mut exec = - MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?; + let mut exec = MemoryExec::try_new( + &partitions, + self.schema(), + self.constraints().cloned(), + projection.cloned(), + )?; let show_sizes = state.config_options().explain.show_sizes; exec = exec.with_show_sizes(show_sizes); diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index d2d35c3877c1..c06e332284f2 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -216,6 +216,7 @@ fn try_swapping_with_memory( MemoryExec::try_new( memory.partitions(), memory.original_schema(), + Some(memory.constraints().to_owned()), Some(new_projections), ) .map(|e| Arc::new(e) as _) @@ -1770,7 +1771,7 @@ mod tests { Field::new("e", DataType::Int32, true), ])); - Arc::new(MemoryExec::try_new(&[], schema, Some(vec![2, 0, 3, 4])).unwrap()) + Arc::new(MemoryExec::try_new(&[], schema, None, Some(vec![2, 0, 3, 4])).unwrap()) } #[test] diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 9156301393c0..acecf776ca1d 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -216,7 +216,7 @@ pub fn coalesce_partitions_exec(input: Arc) -> Arc Arc { - Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None, None).unwrap()) } pub fn hash_join_exec( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47b31d2f4e2d..f5b817cae3d1 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1940,7 +1940,7 @@ impl DefaultPhysicalPlanner { let schema = record_batch.schema(); let partitions = vec![vec![record_batch]]; let projection = None; - let mem_exec = MemoryExec::try_new(&partitions, schema, projection)?; + let mem_exec = MemoryExec::try_new(&partitions, schema, None, projection)?; Ok(Arc::new(mem_exec)) } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 09d0c8d5ca2e..8b23e2a62623 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -302,11 +302,12 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str let concat_input_record = concat_batches(&schema, &input1).unwrap(); let usual_source = Arc::new( - MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(), + MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None, None) + .unwrap(), ); let running_source = Arc::new( - MemoryExec::try_new(&[input1.clone()], schema.clone(), None) + MemoryExec::try_new(&[input1.clone()], schema.clone(), None, None) .unwrap() .try_with_sort_information(vec![sort_keys]) .unwrap(), diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index cf1742a30e66..b825b224f6df 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -404,10 +404,12 @@ impl JoinFuzzTestCase { fn left_right(&self) -> (Arc, Arc) { let schema1 = self.input1[0].schema(); let schema2 = self.input2[0].schema(); - let left = - Arc::new(MemoryExec::try_new(&[self.input1.clone()], schema1, None).unwrap()); - let right = - Arc::new(MemoryExec::try_new(&[self.input2.clone()], schema2, None).unwrap()); + let left = Arc::new( + MemoryExec::try_new(&[self.input1.clone()], schema1, None, None).unwrap(), + ); + let right = Arc::new( + MemoryExec::try_new(&[self.input2.clone()], schema2, None, None).unwrap(), + ); (left, right) } diff --git a/datafusion/core/tests/fuzz_cases/merge_fuzz.rs b/datafusion/core/tests/fuzz_cases/merge_fuzz.rs index 4e895920dd3d..e062e902a5bb 100644 --- a/datafusion/core/tests/fuzz_cases/merge_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/merge_fuzz.rs @@ -116,7 +116,7 @@ async fn run_merge_test(input: Vec>) { }, }]); - let exec = MemoryExec::try_new(&input, schema, None).unwrap(); + let exec = MemoryExec::try_new(&input, schema, None, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let session_config = SessionConfig::new().with_batch_size(batch_size); diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index 19ffa69f11d3..22a309221fc5 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -123,7 +123,7 @@ impl SortTest { }, }]); - let exec = MemoryExec::try_new(&input, schema, None).unwrap(); + let exec = MemoryExec::try_new(&input, schema, None, None).unwrap(); let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); let session_config = SessionConfig::new(); diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index daa282c8fe4a..a8b4f9364ca3 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -357,7 +357,7 @@ mod sp_repartition_fuzz_tests { let concat_input_record = concat_batches(&schema, &input1).unwrap(); let running_source = Arc::new( - MemoryExec::try_new(&[input1.clone()], schema.clone(), None) + MemoryExec::try_new(&[input1.clone()], schema.clone(), None, None) .unwrap() .try_with_sort_information(vec![sort_keys.clone()]) .unwrap(), diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 67666f5d7a1c..7efbca9e4801 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -163,6 +163,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> { &[batches.clone()], schema.clone(), None, + None, )?); // Different window functions to test causality @@ -639,7 +640,7 @@ async fn run_window_test( }, ]); let mut exec1 = Arc::new( - MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None)? + MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None, None)? .try_with_sort_information(vec![source_sort_keys.clone()])?, ) as _; // Table is ordered according to ORDER BY a, b, c In linear test we use PARTITION BY b, ORDER BY a @@ -665,7 +666,7 @@ async fn run_window_test( vec![], )?) as _; let exec2 = Arc::new( - MemoryExec::try_new(&[input1.clone()], schema.clone(), None)? + MemoryExec::try_new(&[input1.clone()], schema.clone(), None, None)? .try_with_sort_information(vec![source_sort_keys.clone()])?, ); let running_window_exec = Arc::new(BoundedWindowAggExec::try_new( diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index c7514d1c24b1..47ac885be1e4 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -843,7 +843,7 @@ impl TableProvider for SortedTableProvider { _limit: Option, ) -> Result> { let mem_exec = - MemoryExec::try_new(&self.batches, self.schema(), projection.cloned())? + MemoryExec::try_new(&self.batches, self.schema(), None, projection.cloned())? .try_with_sort_information(self.sort_information.clone())?; Ok(Arc::new(mem_exec)) diff --git a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs index 6910db6285a3..0f8e17d8dde1 100644 --- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs @@ -81,6 +81,7 @@ fn mock_data() -> Result> { &[vec![batch]], Arc::clone(&schema), None, + None, )?)) } diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index b5f94107dd0b..a7a0d10f3df9 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -156,6 +156,7 @@ impl TableProvider for SimpleCsvTable { Ok(Arc::new(MemoryExec::try_new( &[batches], TableProvider::schema(self), + None, projection.cloned(), )?)) } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index a00bc4b1d571..379892063508 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -252,6 +252,7 @@ mod tests { &[vec![batch]], Arc::clone(&schema), None, + None, )?)) } diff --git a/datafusion/physical-plan/benches/spm.rs b/datafusion/physical-plan/benches/spm.rs index fbbd27409173..304ef9312957 100644 --- a/datafusion/physical-plan/benches/spm.rs +++ b/datafusion/physical-plan/benches/spm.rs @@ -82,7 +82,7 @@ fn generate_spm_for_round_robin_tie_breaker( }, ]); - let exec = MemoryExec::try_new(&partitiones, schema, None).unwrap(); + let exec = MemoryExec::try_new(&partitiones, schema, None, None).unwrap(); SortPreservingMergeExec::new(sort, Arc::new(exec)) .with_round_robin_repartition(enable_round_robin_repartition) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c04211d679ca..50b883616dbd 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2185,6 +2185,7 @@ mod tests { ], Arc::clone(&schema), None, + None, )?); let aggregate_exec = Arc::new(AggregateExec::try_new( AggregateMode::Partial, @@ -2415,6 +2416,7 @@ mod tests { &[input_batches], Arc::clone(&schema), None, + None, )?); let aggregate_exec = Arc::new(AggregateExec::try_new( @@ -2538,6 +2540,7 @@ mod tests { &[vec![batch.clone()]], Arc::::clone(&batch.schema()), None, + None, )?); let aggregate_exec = Arc::new(AggregateExec::try_new( AggregateMode::FinalPartitioned, @@ -2607,6 +2610,7 @@ mod tests { &[input_data], Arc::clone(&schema), None, + None, )?); let aggregate_exec = Arc::new(AggregateExec::try_new( AggregateMode::Partial, @@ -2697,6 +2701,7 @@ mod tests { &[input_data], Arc::clone(&schema), None, + None, )?); let aggregate_exec = Arc::new(AggregateExec::try_new( AggregateMode::Partial, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a0fe0bd116ee..f881f7a1626b 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1666,7 +1666,7 @@ mod tests { ) -> Arc { let batch = build_table_i32(a, b, c); let schema = batch.schema(); - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap()) } fn join( @@ -2069,7 +2069,8 @@ mod tests { build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9])); let schema = batch1.schema(); let left = Arc::new( - MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None).unwrap(), + MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None, None) + .unwrap(), ); let right = build_table( @@ -2141,7 +2142,8 @@ mod tests { let schema = batch1.schema(); let left = Arc::new( - MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None).unwrap(), + MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None, None) + .unwrap(), ); let right = build_table( ("a2", &vec![20, 30, 10]), @@ -2195,7 +2197,8 @@ mod tests { build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90])); let schema = batch1.schema(); let right = Arc::new( - MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None).unwrap(), + MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None, None) + .unwrap(), ); let on = vec![( @@ -2275,7 +2278,8 @@ mod tests { let batch = build_table_i32(a, b, c); let schema = batch.schema(); Arc::new( - MemoryExec::try_new(&[vec![batch.clone(), batch]], schema, None).unwrap(), + MemoryExec::try_new(&[vec![batch.clone(), batch]], schema, None, None) + .unwrap(), ) } @@ -2381,7 +2385,8 @@ mod tests { Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _, )]; let schema = right.schema(); - let right = Arc::new(MemoryExec::try_new(&[vec![right]], schema, None).unwrap()); + let right = + Arc::new(MemoryExec::try_new(&[vec![right]], schema, None, None).unwrap()); let join = join(left, right, on, &JoinType::Left, false).unwrap(); let columns = columns(&join.schema()); @@ -2418,7 +2423,8 @@ mod tests { Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, )]; let schema = right.schema(); - let right = Arc::new(MemoryExec::try_new(&[vec![right]], schema, None).unwrap()); + let right = + Arc::new(MemoryExec::try_new(&[vec![right]], schema, None, None).unwrap()); let join = join(left, right, on, &JoinType::Full, false).unwrap(); let columns = columns(&join.schema()); @@ -3704,13 +3710,14 @@ mod tests { let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?; let left = Arc::new( - MemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None).unwrap(), + MemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None, None).unwrap(), ); let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109])); let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6])); let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?; - let right = Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()); + let right = + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap()); let on = vec![( Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _, @@ -4001,6 +4008,7 @@ mod tests { &[vec![left_batch.clone()], vec![left_batch.clone()]], left_batch.schema(), None, + None, ) .unwrap(), ); @@ -4014,6 +4022,7 @@ mod tests { &[vec![right_batch.clone()], vec![right_batch.clone()]], right_batch.schema(), None, + None, ) .unwrap(), ); @@ -4091,7 +4100,7 @@ mod tests { ) .unwrap(); let schema_ref = batch.schema(); - Arc::new(MemoryExec::try_new(&[vec![batch]], schema_ref, None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![batch]], schema_ref, None, None).unwrap()) } #[tokio::test] diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index c69fa2888806..c4051e3a4579 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -948,7 +948,7 @@ pub(crate) mod tests { }; let mut exec = - MemoryExec::try_new(&[batches], Arc::clone(&schema), None).unwrap(); + MemoryExec::try_new(&[batches], Arc::clone(&schema), None, None).unwrap(); if !sorted_column_names.is_empty() { let mut sort_info = LexOrdering::default(); for name in sorted_column_names { diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 438d9818475d..8974bf230c0b 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -2381,12 +2381,12 @@ mod tests { ) -> Arc { let batch = build_table_i32(a, b, c); let schema = batch.schema(); - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap()) } fn build_table_from_batches(batches: Vec) -> Arc { let schema = batches.first().unwrap().schema(); - Arc::new(MemoryExec::try_new(&[batches], schema, None).unwrap()) + Arc::new(MemoryExec::try_new(&[batches], schema, None, None).unwrap()) } fn build_date_table( @@ -2411,7 +2411,7 @@ mod tests { .unwrap(); let schema = batch.schema(); - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap()) } fn build_date64_table( @@ -2436,7 +2436,7 @@ mod tests { .unwrap(); let schema = batch.schema(); - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap()) } /// returns a table with 3 columns of i32 in memory @@ -2459,7 +2459,7 @@ mod tests { ], ) .unwrap(); - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap()) } fn join( diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index 37d6c0aff850..5ce21842bfcc 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -530,10 +530,10 @@ pub fn create_memory_table( right_sorted: Vec, ) -> Result<(Arc, Arc)> { let left_schema = left_partition[0].schema(); - let left = MemoryExec::try_new(&[left_partition], left_schema, None)? + let left = MemoryExec::try_new(&[left_partition], left_schema, None, None)? .try_with_sort_information(left_sorted)?; let right_schema = right_partition[0].schema(); - let right = MemoryExec::try_new(&[right_partition], right_schema, None)? + let right = MemoryExec::try_new(&[right_partition], right_schema, None, None)? .try_with_sort_information(right_sorted)?; Ok((Arc::new(left), Arc::new(right))) } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 521008ce9b02..2866645f24be 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -31,7 +31,7 @@ use crate::execution_plan::{Boundedness, EmissionType}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, project_schema, Result}; +use datafusion_common::{internal_err, project_schema, Constraints, Result}; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; @@ -50,6 +50,8 @@ pub struct MemoryExec { schema: SchemaRef, /// Schema representing the data after the optional projection is applied projected_schema: SchemaRef, + /// Table constraints + constraints: Constraints, /// Optional projection projection: Option>, // Sort information: one or more equivalent orderings @@ -158,15 +160,22 @@ impl MemoryExec { pub fn try_new( partitions: &[Vec], schema: SchemaRef, + constraints: Option, projection: Option>, ) -> Result { let projected_schema = project_schema(&schema, projection.as_ref())?; - let cache = - Self::compute_properties(Arc::clone(&projected_schema), &[], partitions); + let constraints = constraints.unwrap_or_else(Constraints::empty); + let cache = Self::compute_properties( + Arc::clone(&projected_schema), + &[], + &constraints, + partitions, + ); Ok(Self { partitions: partitions.to_vec(), schema, projected_schema, + constraints, projection, sort_information: vec![], cache, @@ -180,6 +189,11 @@ impl MemoryExec { self } + /// Ref to constraints + pub fn constraints(&self) -> &Constraints { + &self.constraints + } + /// Ref to partitions pub fn partitions(&self) -> &[Vec] { &self.partitions @@ -284,10 +298,12 @@ impl MemoryExec { fn compute_properties( schema: SchemaRef, orderings: &[LexOrdering], + constraints: &Constraints, partitions: &[Vec], ) -> PlanProperties { PlanProperties::new( - EquivalenceProperties::new_with_orderings(schema, orderings), + EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints.clone()), Partitioning::UnknownPartitioning(partitions.len()), EmissionType::Incremental, Boundedness::Bounded, @@ -563,7 +579,7 @@ mod memory_exec_tests { expected_output_order.extend(sort2.clone()); let sort_information = vec![sort1.clone(), sort2.clone()]; - let mem_exec = MemoryExec::try_new(&[vec![]], schema, None)? + let mem_exec = MemoryExec::try_new(&[vec![]], schema, None, None)? .try_with_sort_information(sort_information)?; assert_eq!( diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 270aabeb553c..f6ff6c8ba568 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1124,7 +1124,8 @@ mod tests { ) -> Result>> { let task_ctx = Arc::new(TaskContext::default()); // create physical plan - let exec = MemoryExec::try_new(&input_partitions, Arc::clone(schema), None)?; + let exec = + MemoryExec::try_new(&input_partitions, Arc::clone(schema), None, None)?; let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?; // execute and collect results @@ -1515,7 +1516,8 @@ mod tests { let task_ctx = Arc::new(task_ctx); // create physical plan - let exec = MemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?; + let exec = + MemoryExec::try_new(&input_partitions, Arc::clone(&schema), None, None)?; let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?; // pull partitions @@ -1661,7 +1663,7 @@ mod test { } fn memory_exec(schema: &SchemaRef) -> Arc { - Arc::new(MemoryExec::try_new(&[vec![]], Arc::clone(schema), None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![]], Arc::clone(schema), None, None).unwrap()) } fn sorted_memory_exec( @@ -1669,7 +1671,7 @@ mod test { sort_exprs: LexOrdering, ) -> Arc { Arc::new( - MemoryExec::try_new(&[vec![]], Arc::clone(schema), None) + MemoryExec::try_new(&[vec![]], Arc::clone(schema), None, None) .unwrap() .try_with_sort_information(vec![sort_exprs]) .unwrap(), diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index c838376a482e..60793108057a 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -700,6 +700,7 @@ mod tests { &[vec![batch1, batch2, batch3, batch4]], Arc::clone(&schema), None, + None, ) .unwrap(), ) as Arc @@ -885,6 +886,7 @@ mod tests { &[vec![batch]], Arc::clone(&schema), None, + None, )?); let partial_sort_exec = Arc::new(PartialSortExec::new( @@ -991,7 +993,7 @@ mod tests { options: option_desc, }, ]), - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None)?), 2, )); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 33c8a2b2fee3..c7ecf3486dce 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1345,7 +1345,7 @@ mod tests { let batch = RecordBatch::try_new(Arc::clone(&schema), vec![data]).unwrap(); let input = Arc::new( - MemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None).unwrap(), + MemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None, None).unwrap(), ); let sort_exec = Arc::new(SortExec::new( @@ -1420,6 +1420,7 @@ mod tests { &[vec![batch]], Arc::clone(&schema), None, + None, )?), )); @@ -1506,7 +1507,7 @@ mod tests { }, }, ]), - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None)?), )); assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index adcb28e538fd..92a08f1022cc 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -415,7 +415,7 @@ mod tests { }, ]); - let exec = MemoryExec::try_new(&[rbs], schema, None).unwrap(); + let exec = MemoryExec::try_new(&[rbs], schema, None, None).unwrap(); let repartition_exec = RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(2))?; let coalesce_batches_exec = @@ -507,8 +507,9 @@ mod tests { let schema = batch.schema(); let sort = LexOrdering::default(); // no sort expressions - let exec = MemoryExec::try_new(&[vec![batch.clone()], vec![batch]], schema, None) - .unwrap(); + let exec = + MemoryExec::try_new(&[vec![batch.clone()], vec![batch]], schema, None, None) + .unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let res = collect(merge, task_ctx).await.unwrap_err(); @@ -695,7 +696,7 @@ mod tests { options: Default::default(), }, ]); - let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge, context).await.unwrap(); @@ -804,7 +805,7 @@ mod tests { let split: Vec<_> = sizes.iter().map(|x| split_batch(&sorted, *x)).collect(); Ok(Arc::new( - MemoryExec::try_new(&split, sorted.schema(), None).unwrap(), + MemoryExec::try_new(&split, sorted.schema(), None, None).unwrap(), )) } @@ -933,7 +934,8 @@ mod tests { }, }, ]); - let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap(); + let exec = + MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge, task_ctx).await.unwrap(); @@ -975,7 +977,7 @@ mod tests { nulls_first: true, }, }]); - let exec = MemoryExec::try_new(&[vec![batch]], schema, None).unwrap(); + let exec = MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap(); let merge = Arc::new( SortPreservingMergeExec::new(sort, Arc::new(exec)).with_fetch(Some(2)), ); @@ -1011,7 +1013,7 @@ mod tests { nulls_first: true, }, }]); - let exec = MemoryExec::try_new(&[vec![batch]], schema, None).unwrap(); + let exec = MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge, task_ctx).await.unwrap(); @@ -1120,7 +1122,8 @@ mod tests { expr: col("b", &schema).unwrap(), options: Default::default(), }]); - let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap(); + let exec = + MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(Arc::clone(&merge) as Arc, task_ctx) @@ -1231,7 +1234,7 @@ mod tests { }, }]); - let exec = MemoryExec::try_new(&partitions, schema, None).unwrap(); + let exec = MemoryExec::try_new(&partitions, schema, None, None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge, task_ctx).await.unwrap(); diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 90ec9b106850..f2a834b4d73e 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -96,7 +96,7 @@ pub fn build_table_scan_i32( ) -> Arc { let batch = build_table_i32(a, b, c); let schema = batch.schema(); - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None, None).unwrap()) } /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i" @@ -122,7 +122,7 @@ pub fn mem_exec(partitions: usize) -> MemoryExec { let schema = data[0][0].schema(); let projection = None; - MemoryExec::try_new(&data, schema, projection).unwrap() + MemoryExec::try_new(&data, schema, None, projection).unwrap() } // Construct a stream partition for test purposes diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 6e768a3d87bc..a3c420c4e15a 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -814,11 +814,11 @@ mod tests { .map(|ordering| convert_to_sort_exprs(ordering)) .collect::>(); let child1 = Arc::new( - MemoryExec::try_new(&[], Arc::clone(&schema), None)? + MemoryExec::try_new(&[], Arc::clone(&schema), None, None)? .try_with_sort_information(first_orderings)?, ); let child2 = Arc::new( - MemoryExec::try_new(&[], Arc::clone(&schema), None)? + MemoryExec::try_new(&[], Arc::clone(&schema), None, None)? .try_with_sort_information(second_orderings)?, ); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 2ac86da92e50..ec73fb9a1e20 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1535,6 +1535,7 @@ mod tests { &[vec![batch.clone(), batch.clone(), batch.clone()]], Arc::clone(&schema), None, + None, ) .map(|e| Arc::new(e) as Arc)?; let col_a = col("a", &schema)?; diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 56f088dfd10f..7051ed7115a1 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3958,8 +3958,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sn@0 as sn, ts@1 as ts, currency@2 as currency, amount@3 as amount, sum(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1] 02)--BoundedWindowAggExec: wdw=[sum(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "sum(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }], mode=[Sorted] -03)----SortExec: expr=[sn@0 ASC NULLS LAST], preserve_partitioning=[false] -04)------MemoryExec: partitions=1, partition_sizes=[1] +03)----MemoryExec: partitions=1, partition_sizes=[1] # test ROW_NUMBER window function returns correct data_type query T