Skip to content

Commit

Permalink
More tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiayue Charles Lin committed Sep 20, 2023
1 parent 36ad4bb commit 7cae5a0
Showing 1 changed file with 95 additions and 3 deletions.
98 changes: 95 additions & 3 deletions src/daft-plan/src/optimization/rules/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,17 +498,16 @@ mod tests {
use daft_dsl::{binary_op, col, lit, Operator};

use crate::{
logical_ops::Project,
optimization::{
optimizer::{RuleBatch, RuleExecutionStrategy},
rules::PushDownProjection,
Optimizer,
},
test::dummy_scan_node,
JoinType, LogicalPlan, PartitionScheme,
LogicalPlan,
};

/// Helper that creates an optimizer with the PushDownFilter rule registered, optimizes
/// Helper that creates an optimizer with the PushDownProjection rule registered, optimizes
/// the provided plan with said optimizer, and compares the optimized plan's repr with
/// the provided expected repr.
fn assert_optimized_plan_eq(plan: Arc<LogicalPlan>, expected: &str) -> DaftResult<()> {
Expand Down Expand Up @@ -614,4 +613,97 @@ mod tests {

Ok(())
}

/// Projection<-Source
#[test]
fn test_projection_source() -> DaftResult<()> {
let unoptimized = dummy_scan_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Int64),
])
.project(
vec![binary_op(Operator::Plus, &col("b"), &lit(3))],
Default::default(),
)?
.build();

let expected = "\
Project: col(b) + lit(3), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = b (Int64)";
assert_optimized_plan_eq(unoptimized, expected)?;

Ok(())
}

/// Projection<-Projection column pruning
#[test]
fn test_projection_projection() -> DaftResult<()> {
let unoptimized = dummy_scan_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Int64),
])
.project(
vec![
binary_op(Operator::Plus, &col("b"), &lit(3)),
col("a"),
col("a").alias("x"),
],
Default::default(),
)?
.project(
vec![col("a"), col("b"), col("b").alias("c")],
Default::default(),
)?
.build();

let expected = "\
Project: col(a), col(b), col(b) AS c, Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Project: col(b) + lit(3), col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)";
assert_optimized_plan_eq(unoptimized, expected)?;

Ok(())
}

/// Projection<-Aggregation column pruning
#[test]
fn test_projection_aggregation() -> DaftResult<()> {
let unoptimized = dummy_scan_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Int64),
Field::new("c", DataType::Int64),
])
.aggregate(vec![col("a").mean(), col("b").mean()], vec![col("c")])?
.project(vec![col("a")], Default::default())?
.build();

let expected = "\
Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Aggregation: mean(col(a)), Group by = col(c), Output schema = c (Int64), a (Float64)\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), c (Int64)";
assert_optimized_plan_eq(unoptimized, expected)?;

Ok(())
}

/// Projection<-X pushes down the combined required columns
#[test]
fn test_projection_pushdown() -> DaftResult<()> {
let unoptimized = dummy_scan_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Boolean),
Field::new("c", DataType::Int64),
])
.filter(col("b"))?
.project(vec![col("a")], Default::default())?
.build();

let expected = "\
Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
\n Filter: col(b)\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Boolean), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Boolean)";
assert_optimized_plan_eq(unoptimized, expected)?;

Ok(())
}
}

0 comments on commit 7cae5a0

Please sign in to comment.