diff --git a/daft/daft.pyi b/daft/daft.pyi index 786d00e374..dffd96aa44 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -720,7 +720,7 @@ class LogicalPlanBuilder: ) -> LogicalPlanBuilder: ... def project(self, projection: list[PyExpr], resource_request: ResourceRequest) -> LogicalPlanBuilder: ... def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ... - def limit(self, limit: int) -> LogicalPlanBuilder: ... + def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ... def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ... def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ... def repartition( diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 54030c0553..f9f07d5b45 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -170,7 +170,7 @@ def show(self, n: int = 8) -> "DataFrameDisplay": DataFrameDisplay: object that has a rich tabular display """ df = self - df = df.limit(n) + df = df.limit(n, eager=True) df.collect(num_preview_rows=None) collected_preview = df._preview assert collected_preview is not None @@ -578,7 +578,7 @@ def sort( return DataFrame(builder) @DataframePublicAPI - def limit(self, num: int) -> "DataFrame": + def limit(self, num: int, eager: bool = False) -> "DataFrame": """Limits the rows in the DataFrame to the first ``N`` rows, similar to a SQL ``LIMIT`` Example: @@ -586,11 +586,13 @@ def limit(self, num: int) -> "DataFrame": Args: num (int): maximum rows to allow. + eager (bool): whether to maximize for latency (time to first result) by eagerly executing + only one partition at a time, or throughput by executing multiple limits at a time Returns: DataFrame: Limited DataFrame """ - builder = self._builder.limit(num) + builder = self._builder.limit(num, eager=eager) return DataFrame(builder) @DataframePublicAPI diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index e4bc572bc6..4bf3f8ff95 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -277,6 +277,7 @@ def local_limit( def global_limit( child_plan: InProgressPhysicalPlan[PartitionT], limit_rows: int, + eager: bool, num_partitions: int, ) -> InProgressPhysicalPlan[PartitionT]: """Return the first n rows from the `child_plan`.""" @@ -342,6 +343,12 @@ def global_limit( yield None continue + # If running in eager mode, only allow one task in flight + if eager and len(materializations) > 0: + logger.debug(f"global_limit blocking on eager execution of: {materializations[0]}") + yield None + continue + # Execute a single child partition. try: child_step = child_plan.send(remaining_rows) if started else next(child_plan) diff --git a/daft/execution/physical_plan_factory.py b/daft/execution/physical_plan_factory.py index c90cd148ea..2211e5929a 100644 --- a/daft/execution/physical_plan_factory.py +++ b/daft/execution/physical_plan_factory.py @@ -104,6 +104,7 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) -> return physical_plan.global_limit( child_plan=child_plan, limit_rows=node._num, + eager=node._eager, num_partitions=node.num_partitions(), ) diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 78beb92a88..40ae5e421b 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -100,7 +100,7 @@ def filter(self, predicate: Expression) -> LogicalPlanBuilder: pass @abstractmethod - def limit(self, num_rows: int) -> LogicalPlanBuilder: + def limit(self, num_rows: int, eager: bool) -> LogicalPlanBuilder: pass @abstractmethod diff --git a/daft/logical/logical_plan.py b/daft/logical/logical_plan.py index dbba21bc07..338f512605 100644 --- a/daft/logical/logical_plan.py +++ b/daft/logical/logical_plan.py @@ -150,9 +150,9 @@ def project( def filter(self, predicate: Expression): return Filter(self._plan, ExpressionsProjection([predicate])).to_builder() - def limit(self, num_rows: int) -> LogicalPlanBuilder: + def limit(self, num_rows: int, eager: bool) -> LogicalPlanBuilder: local_limit = LocalLimit(self._plan, num=num_rows) - plan = GlobalLimit(local_limit, num=num_rows) + plan = GlobalLimit(local_limit, num=num_rows, eager=eager) return plan.to_builder() def explode(self, explode_expressions: list[Expression]) -> PyLogicalPlanBuilder: @@ -828,17 +828,18 @@ def rebuild(self) -> LogicalPlan: class GlobalLimit(UnaryNode): - def __init__(self, input: LogicalPlan, num: int) -> None: + def __init__(self, input: LogicalPlan, num: int, eager: bool) -> None: super().__init__(input.schema(), partition_spec=input.partition_spec(), op_level=OpLevel.GLOBAL) self._register_child(input) self._num = num + self._eager = eager def __repr__(self) -> str: return self._repr_helper(num=self._num) def copy_with_new_children(self, new_children: list[LogicalPlan]) -> LogicalPlan: assert len(new_children) == 1 - return GlobalLimit(new_children[0], self._num) + return GlobalLimit(new_children[0], self._num, self._eager) def required_columns(self) -> list[set[str]]: return [set()] @@ -850,7 +851,7 @@ def _local_eq(self, other: Any) -> bool: return isinstance(other, GlobalLimit) and self.schema() == other.schema() and self._num == other._num def rebuild(self) -> LogicalPlan: - return GlobalLimit(input=self._children()[0].rebuild(), num=self._num) + return GlobalLimit(input=self._children()[0].rebuild(), num=self._num, eager=self._eager) class LocalCount(UnaryNode): diff --git a/daft/logical/optimizer.py b/daft/logical/optimizer.py index 6b4db7537a..97210f03de 100644 --- a/daft/logical/optimizer.py +++ b/daft/logical/optimizer.py @@ -457,7 +457,7 @@ def _push_down_global_limit_into_unary_node(self, parent: GlobalLimit, child: Un """ logger.debug(f"pushing {parent} into {child}") grandchild = child._children()[0] - return child.copy_with_new_children([GlobalLimit(grandchild, num=parent._num)]) + return child.copy_with_new_children([GlobalLimit(grandchild, num=parent._num, eager=parent._eager)]) @property def _supported_unary_nodes(self) -> set[type[LogicalPlan]]: diff --git a/daft/logical/rust_logical_plan.py b/daft/logical/rust_logical_plan.py index a3bc920be2..3b48e24b4b 100644 --- a/daft/logical/rust_logical_plan.py +++ b/daft/logical/rust_logical_plan.py @@ -82,8 +82,8 @@ def filter(self, predicate: Expression) -> RustLogicalPlanBuilder: builder = self._builder.filter(predicate._expr) return RustLogicalPlanBuilder(builder) - def limit(self, num_rows: int) -> RustLogicalPlanBuilder: - builder = self._builder.limit(num_rows) + def limit(self, num_rows: int, eager: bool) -> RustLogicalPlanBuilder: + builder = self._builder.limit(num_rows, eager) return RustLogicalPlanBuilder(builder) def explode(self, explode_expressions: list[Expression]) -> RustLogicalPlanBuilder: diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 142504825b..005fcabdd0 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -119,8 +119,9 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } - pub fn limit(&self, limit: i64) -> DaftResult { - let logical_plan: LogicalPlan = logical_ops::Limit::new(self.plan.clone(), limit).into(); + pub fn limit(&self, limit: i64, eager: bool) -> DaftResult { + let logical_plan: LogicalPlan = + logical_ops::Limit::new(self.plan.clone(), limit, eager).into(); Ok(logical_plan.into()) } @@ -317,8 +318,8 @@ impl PyLogicalPlanBuilder { Ok(self.builder.filter(predicate.expr)?.into()) } - pub fn limit(&self, limit: i64) -> PyResult { - Ok(self.builder.limit(limit)?.into()) + pub fn limit(&self, limit: i64, eager: bool) -> PyResult { + Ok(self.builder.limit(limit, eager)?.into()) } pub fn explode(&self, to_explode: Vec) -> PyResult { diff --git a/src/daft-plan/src/logical_ops/limit.rs b/src/daft-plan/src/logical_ops/limit.rs index 4425e6db3d..4d91ee4a84 100644 --- a/src/daft-plan/src/logical_ops/limit.rs +++ b/src/daft-plan/src/logical_ops/limit.rs @@ -8,10 +8,17 @@ pub struct Limit { pub input: Arc, // Limit on number of rows. pub limit: i64, + // Whether to send tasks in waves (maximize throughput) or + // eagerly one-at-a-time (maximize time-to-first-result) + pub eager: bool, } impl Limit { - pub(crate) fn new(input: Arc, limit: i64) -> Self { - Self { input, limit } + pub(crate) fn new(input: Arc, limit: i64, eager: bool) -> Self { + Self { + input, + limit, + eager, + } } } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 454e2e1116..5fba68d491 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -226,7 +226,7 @@ impl LogicalPlan { input.clone(), projection.clone(), resource_request.clone(), ).unwrap()), Self::Filter(Filter { predicate, .. }) => Self::Filter(Filter::try_new(input.clone(), predicate.clone()).unwrap()), - Self::Limit(Limit { limit, .. }) => Self::Limit(Limit::new(input.clone(), *limit)), + Self::Limit(Limit { limit, eager, .. }) => Self::Limit(Limit::new(input.clone(), *limit, *eager)), Self::Explode(Explode { to_explode, .. }) => Self::Explode(Explode::try_new(input.clone(), to_explode.clone()).unwrap()), Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()), Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone())), diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 3f5dbb2af5..2e9c209d2f 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -23,7 +23,7 @@ impl OptimizerRule for PushDownLimit { fn try_optimize(&self, plan: Arc) -> DaftResult>> { match plan.as_ref() { - LogicalPlan::Limit(LogicalLimit { input, limit }) => { + LogicalPlan::Limit(LogicalLimit { input, limit, .. }) => { let limit = *limit as usize; match input.as_ref() { // Naive commuting with unary ops. @@ -120,7 +120,7 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .limit(5)? + .limit(5, false)? .build(); let expected = "\ Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5"; @@ -140,7 +140,7 @@ mod tests { ], Some(3), ) - .limit(5)? + .limit(5, false)? .build(); let expected = "\ Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 3"; @@ -160,7 +160,7 @@ mod tests { ], Some(10), ) - .limit(5)? + .limit(5, false)? .build(); let expected = "\ Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5"; @@ -177,7 +177,7 @@ mod tests { let py_obj = Python::with_gil(|py| py.None()); let schema: Arc = Schema::new(vec![Field::new("a", DataType::Int64)])?.into(); let plan = LogicalPlanBuilder::in_memory_scan("foo", py_obj, schema, Default::default())? - .limit(5)? + .limit(5, false)? .build(); let expected = "\ Limit: 5\ @@ -196,7 +196,7 @@ mod tests { Field::new("b", DataType::Utf8), ]) .repartition(1, vec![col("a")], PartitionScheme::Hash)? - .limit(5)? + .limit(5, false)? .build(); let expected = "\ Repartition: Scheme = Hash, Number of partitions = 1, Partition by = col(a)\ @@ -215,7 +215,7 @@ mod tests { Field::new("b", DataType::Utf8), ]) .coalesce(1)? - .limit(5)? + .limit(5, false)? .build(); let expected = "\ Coalesce: To = 1\ @@ -234,7 +234,7 @@ mod tests { Field::new("b", DataType::Utf8), ]) .project(vec![col("a")], Default::default())? - .limit(5)? + .limit(5, false)? .build(); let expected = "\ Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ diff --git a/src/daft-plan/src/physical_ops/limit.rs b/src/daft-plan/src/physical_ops/limit.rs index d96c952ac7..c64346530c 100644 --- a/src/daft-plan/src/physical_ops/limit.rs +++ b/src/daft-plan/src/physical_ops/limit.rs @@ -8,14 +8,21 @@ pub struct Limit { // Upstream node. pub input: Arc, pub limit: i64, + pub eager: bool, pub num_partitions: usize, } impl Limit { - pub(crate) fn new(input: Arc, limit: i64, num_partitions: usize) -> Self { + pub(crate) fn new( + input: Arc, + limit: i64, + eager: bool, + num_partitions: usize, + ) -> Self { Self { input, limit, + eager, num_partitions, } } diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index fdfbc3b9d4..a3d13bbb23 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -303,6 +303,7 @@ impl PhysicalPlan { PhysicalPlan::Limit(Limit { input, limit, + eager, num_partitions, }) => { let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?; @@ -313,7 +314,7 @@ impl PhysicalPlan { .call1((upstream_iter, *limit))?; let global_limit_iter = py_physical_plan .getattr(pyo3::intern!(py, "global_limit"))? - .call1((local_limit_iter, *limit, *num_partitions))?; + .call1((local_limit_iter, *limit, *eager, *num_partitions))?; Ok(global_limit_iter.into()) } PhysicalPlan::Explode(Explode { input, to_explode }) => { diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 31b2f5e0fb..bf35db6cda 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -102,11 +102,16 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { predicate.clone(), ))) } - LogicalPlan::Limit(LogicalLimit { input, limit }) => { + LogicalPlan::Limit(LogicalLimit { + input, + limit, + eager, + }) => { let input_physical = plan(input)?; Ok(PhysicalPlan::Limit(Limit::new( input_physical.into(), *limit, + *eager, logical_plan.partition_spec().num_partitions, ))) }