Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 15, 2023
1 parent fd0d6b6 commit 85dc31f
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
3 changes: 1 addition & 2 deletions src/daft-plan/src/logical_ops/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ impl Source {
partitioning_keys,
pushdowns,
})) => {
res.push("Source:".to_string());
res.push(format!("Operator = {}", scan_op));
res.push(format!("Source: Operator = {}", scan_op));
res.push(format!("File schema = {}", source_schema.short_string()));
res.push(format!("Partitioning keys = {:?}", partitioning_keys));
res.extend(pushdowns.multiline_display());
Expand Down
36 changes: 35 additions & 1 deletion src/daft-plan/src/optimization/rules/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ mod tests {
rules::PushDownFilter,
Optimizer,
},
test::dummy_scan_node,
test::{dummy_scan_node, dummy_scan_operator_node},
JoinType, LogicalPlan, PartitionScheme,
};

Expand Down Expand Up @@ -324,6 +324,40 @@ mod tests {
Ok(())
}

/// Tests combining of two Filters into a ScanOperator
#[test]
fn pushdown_filter_into_scan_operator() -> DaftResult<()> {
let plan = dummy_scan_operator_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.filter(col("a").lt(&lit(2)))?
.filter(col("b").eq(&lit("foo")))?
.build();
let expected = "\
Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Filter pushdown = [col(b) == lit(\"foo\")] & [col(a) < lit(2)], Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that we cant pushdown a filter into a ScanOperator with a limit
#[test]
fn pushdown_filter_into_scan_operator_with_limit() -> DaftResult<()> {
let plan = dummy_scan_operator_node(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.limit(1, false)?
.filter(col("a").lt(&lit(2)))?
.build();
let expected = "\
Filter: col(a) < lit(2)\
\n Limit: 1\
\n Source: Operator = AnonymousScanOperator: File paths=[/foo], Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), File schema = a (Int64), b (Utf8), Partitioning keys = [], Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that Filter commutes with Projections.
#[test]
fn filter_commutes_with_projection() -> DaftResult<()> {
Expand Down
21 changes: 21 additions & 0 deletions src/daft-plan/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ pub fn dummy_scan_node_with_pushdowns(
pushdowns: Pushdowns,
) -> LogicalPlanBuilder {
let schema = Arc::new(Schema::new(fields).unwrap());

LogicalPlanBuilder::table_scan_with_pushdowns(
FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]),
schema,
FileFormatConfig::Json(Default::default()).into(),
StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(),
pushdowns,
)
.unwrap()
}

pub fn dummy_scan_operator_node(fields: Vec<Field>) -> LogicalPlanBuilder {
dummy_scan_operator_node_with_pushdowns(fields, Default::default())
}

/// Create a dummy scan node containing the provided fields in its schema and the provided limit.
pub fn dummy_scan_operator_node_with_pushdowns(
fields: Vec<Field>,
pushdowns: Pushdowns,
) -> LogicalPlanBuilder {
let schema = Arc::new(Schema::new(fields).unwrap());
let anon = daft_scan::AnonymousScanOperator::new(
vec!["/foo".to_string()],
schema,
Expand Down

0 comments on commit 85dc31f

Please sign in to comment.