Skip to content

Commit

Permalink
Pass constraints to MemoryExec
Browse files Browse the repository at this point in the history
  • Loading branch information
gokselk committed Jan 3, 2025
1 parent 2fbe8c5 commit 82361a7
Show file tree
Hide file tree
Showing 34 changed files with 125 additions and 69 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ impl TableProvider for ParquetMetadataTable {
Ok(Arc::new(MemoryExec::try_new(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
None,
projection.cloned(),
)?))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/remote_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ impl TableProvider for RemoteTable {
Ok(Arc::new(MemoryExec::try_new(
&[batches],
self.schema.clone(),
None,
projection.cloned(),
)?))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl TableProvider for LocalCsvTable {
Ok(Arc::new(MemoryExec::try_new(
&[batches],
TableProvider::schema(self),
None,
projection.cloned(),
)?))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ fn sort_preserving_merge_operator(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
schema,
None,
None,
)
.unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl BenchCase {
let schema = partitions[0][0].schema();
let sort = make_sort_exprs(schema.as_ref());

let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap();
let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));

Self {
Expand All @@ -186,7 +186,7 @@ impl BenchCase {
let schema = partitions[0][0].schema();
let sort = make_sort_exprs(schema.as_ref());

let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap();
let exec =
SortExec::new(sort.clone(), Arc::new(exec)).with_preserve_partitioning(true);
let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
Expand All @@ -208,7 +208,7 @@ impl BenchCase {
let schema = partitions[0][0].schema();
let sort = make_sort_exprs(schema.as_ref());

let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap();
let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));
let plan = Arc::new(SortExec::new(sort, exec));

Expand All @@ -229,7 +229,7 @@ impl BenchCase {
let schema = partitions[0][0].schema();
let sort = make_sort_exprs(schema.as_ref());

let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
let exec = MemoryExec::try_new(partitions, schema, None, None).unwrap();
let exec = SortExec::new(sort, Arc::new(exec)).with_preserve_partitioning(true);
let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));

Expand Down
12 changes: 9 additions & 3 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl MemTable {
state: &SessionState,
) -> Result<Self> {
let schema = t.schema();
let constraints = t.constraints();
let exec = t.scan(state, None, &[], None).await?;
let partition_count = exec.output_partitioning().partition_count();

Expand Down Expand Up @@ -162,7 +163,8 @@ impl MemTable {
}
}

let exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?;
let exec =
MemoryExec::try_new(&data, Arc::clone(&schema), constraints.cloned(), None)?;

if let Some(num_partitions) = output_partitions {
let exec = RepartitionExec::try_new(
Expand Down Expand Up @@ -220,8 +222,12 @@ impl TableProvider for MemTable {
partitions.push(inner_vec.clone())
}

let mut exec =
MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?;
let mut exec = MemoryExec::try_new(
&partitions,
self.schema(),
self.constraints().cloned(),
projection.cloned(),
)?;

let show_sizes = state.config_options().explain.show_sizes;
exec = exec.with_show_sizes(show_sizes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ fn try_swapping_with_memory(
MemoryExec::try_new(
memory.partitions(),
memory.original_schema(),
Some(memory.constraints().to_owned()),
Some(new_projections),
)
.map(|e| Arc::new(e) as _)
Expand Down Expand Up @@ -1770,7 +1771,7 @@ mod tests {
Field::new("e", DataType::Int32, true),
]));

Arc::new(MemoryExec::try_new(&[], schema, Some(vec![2, 0, 3, 4])).unwrap())
Arc::new(MemoryExec::try_new(&[], schema, None, Some(vec![2, 0, 3, 4])).unwrap())
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ pub fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn Execut
}

pub(crate) fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap())
Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None, None).unwrap())
}

pub fn hash_join_exec(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,7 @@ impl DefaultPhysicalPlanner {
let schema = record_batch.schema();
let partitions = vec![vec![record_batch]];
let projection = None;
let mem_exec = MemoryExec::try_new(&partitions, schema, projection)?;
let mem_exec = MemoryExec::try_new(&partitions, schema, None, projection)?;
Ok(Arc::new(mem_exec))
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,12 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str

let concat_input_record = concat_batches(&schema, &input1).unwrap();
let usual_source = Arc::new(
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(),
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None, None)
.unwrap(),
);

let running_source = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
MemoryExec::try_new(&[input1.clone()], schema.clone(), None, None)
.unwrap()
.try_with_sort_information(vec![sort_keys])
.unwrap(),
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,12 @@ impl JoinFuzzTestCase {
fn left_right(&self) -> (Arc<MemoryExec>, Arc<MemoryExec>) {
let schema1 = self.input1[0].schema();
let schema2 = self.input2[0].schema();
let left =
Arc::new(MemoryExec::try_new(&[self.input1.clone()], schema1, None).unwrap());
let right =
Arc::new(MemoryExec::try_new(&[self.input2.clone()], schema2, None).unwrap());
let left = Arc::new(
MemoryExec::try_new(&[self.input1.clone()], schema1, None, None).unwrap(),
);
let right = Arc::new(
MemoryExec::try_new(&[self.input2.clone()], schema2, None, None).unwrap(),
);
(left, right)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/merge_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn run_merge_test(input: Vec<Vec<RecordBatch>>) {
},
}]);

let exec = MemoryExec::try_new(&input, schema, None).unwrap();
let exec = MemoryExec::try_new(&input, schema, None, None).unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));

let session_config = SessionConfig::new().with_batch_size(batch_size);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl SortTest {
},
}]);

let exec = MemoryExec::try_new(&input, schema, None).unwrap();
let exec = MemoryExec::try_new(&input, schema, None, None).unwrap();
let sort = Arc::new(SortExec::new(sort, Arc::new(exec)));

let session_config = SessionConfig::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ mod sp_repartition_fuzz_tests {
let concat_input_record = concat_batches(&schema, &input1).unwrap();

let running_source = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
MemoryExec::try_new(&[input1.clone()], schema.clone(), None, None)
.unwrap()
.try_with_sort_information(vec![sort_keys.clone()])
.unwrap(),
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
&[batches.clone()],
schema.clone(),
None,
None,
)?);

// Different window functions to test causality
Expand Down Expand Up @@ -639,7 +640,7 @@ async fn run_window_test(
},
]);
let mut exec1 = Arc::new(
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None)?
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None, None)?
.try_with_sort_information(vec![source_sort_keys.clone()])?,
) as _;
// Table is ordered according to ORDER BY a, b, c In linear test we use PARTITION BY b, ORDER BY a
Expand All @@ -665,7 +666,7 @@ async fn run_window_test(
vec![],
)?) as _;
let exec2 = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)?
MemoryExec::try_new(&[input1.clone()], schema.clone(), None, None)?
.try_with_sort_information(vec![source_sort_keys.clone()])?,
);
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ impl TableProvider for SortedTableProvider {
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mem_exec =
MemoryExec::try_new(&self.batches, self.schema(), projection.cloned())?
MemoryExec::try_new(&self.batches, self.schema(), None, projection.cloned())?
.try_with_sort_information(self.sort_information.clone())?;

Ok(Arc::new(mem_exec))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fn mock_data() -> Result<Arc<MemoryExec>> {
&[vec![batch]],
Arc::clone(&schema),
None,
None,
)?))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl TableProvider for SimpleCsvTable {
Ok(Arc::new(MemoryExec::try_new(
&[batches],
TableProvider::schema(self),
None,
projection.cloned(),
)?))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ mod tests {
&[vec![batch]],
Arc::clone(&schema),
None,
None,
)?))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/benches/spm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn generate_spm_for_round_robin_tie_breaker(
},
]);

let exec = MemoryExec::try_new(&partitiones, schema, None).unwrap();
let exec = MemoryExec::try_new(&partitiones, schema, None, None).unwrap();
SortPreservingMergeExec::new(sort, Arc::new(exec))
.with_round_robin_repartition(enable_round_robin_repartition)
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2185,6 +2185,7 @@ mod tests {
],
Arc::clone(&schema),
None,
None,
)?);
let aggregate_exec = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
Expand Down Expand Up @@ -2415,6 +2416,7 @@ mod tests {
&[input_batches],
Arc::clone(&schema),
None,
None,
)?);

let aggregate_exec = Arc::new(AggregateExec::try_new(
Expand Down Expand Up @@ -2538,6 +2540,7 @@ mod tests {
&[vec![batch.clone()]],
Arc::<Schema>::clone(&batch.schema()),
None,
None,
)?);
let aggregate_exec = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
Expand Down Expand Up @@ -2607,6 +2610,7 @@ mod tests {
&[input_data],
Arc::clone(&schema),
None,
None,
)?);
let aggregate_exec = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
Expand Down Expand Up @@ -2697,6 +2701,7 @@ mod tests {
&[input_data],
Arc::clone(&schema),
None,
None,
)?);
let aggregate_exec = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
Expand Down
Loading

0 comments on commit 82361a7

Please sign in to comment.