Skip to content

Commit

Permalink
add another test
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 15, 2023
1 parent ead6c80 commit 6d21747
Showing 1 changed file with 57 additions and 8 deletions.
65 changes: 57 additions & 8 deletions src/daft-plan/src/optimization/rules/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ impl OptimizerRule for PushDownLimit {
}) => {
let new_limit = limit.min(*child_limit as usize);
let new_eager = eager | child_eagar;
Ok(Transformed::Yes(
LogicalPlan::Limit(LogicalLimit::new(
input.clone(),
new_limit as i64,
new_eager,
))
.into(),
))

let new_plan = Arc::new(LogicalPlan::Limit(LogicalLimit::new(
input.clone(),
new_limit as i64,
new_eager,
)));
// we rerun the optimizer, ideally when we move to a visitor pattern this should go away
let optimized = self
.try_optimize(new_plan.clone())?
.or(Transformed::Yes(new_plan))
.unwrap()
.clone();
Ok(Transformed::Yes(optimized))
}
_ => Ok(Transformed::No(plan)),
}
Expand Down Expand Up @@ -189,6 +194,50 @@ mod tests {
Ok(())
}

/// Tests that Limit does not push into external Source with existing smaller limit.
///
/// Limit[x]-Limit[y] -> Limit[min(x,y)]
#[test]
fn limit_folds_with_smaller_limit() -> DaftResult<()> {
let plan = dummy_scan_node_with_pushdowns(
vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
],
Pushdowns::default(),
)
.limit(5, false)?
.limit(10, false)?
.build();
let expected = "\
Limit: 5\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that Limit does not push into external Source with existing smaller limit.
///
/// Limit[x]-Limit[y] -> Limit[min(x,y)]
#[test]
fn limit_folds_with_large_limit() -> DaftResult<()> {
let plan = dummy_scan_node_with_pushdowns(
vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
],
Pushdowns::default().with_limit(Some(20)),
)
.limit(10, false)?
.limit(5, false)?
.build();
let expected = "\
Limit: 5\
\n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)";
assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Tests that Limit does push into external Source with existing larger limit.
///
/// Limit-Source[existing_limit] -> Source[new_limit]
Expand Down

0 comments on commit 6d21747

Please sign in to comment.