Skip to content

Commit

Permalink
Add eager mode to limits
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 17, 2023
1 parent f439404 commit 7fa05b0
Show file tree
Hide file tree
Showing 15 changed files with 63 additions and 31 deletions.
2 changes: 1 addition & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ class LogicalPlanBuilder:
) -> LogicalPlanBuilder: ...
def project(self, projection: list[PyExpr], resource_request: ResourceRequest) -> LogicalPlanBuilder: ...
def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ...
def limit(self, limit: int) -> LogicalPlanBuilder: ...
def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ...
def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ...
def repartition(
Expand Down
8 changes: 5 additions & 3 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def show(self, n: int = 8) -> "DataFrameDisplay":
DataFrameDisplay: object that has a rich tabular display
"""
df = self
df = df.limit(n)
df = df.limit(n, eager=True)
df.collect(num_preview_rows=None)
collected_preview = df._preview
assert collected_preview is not None
Expand Down Expand Up @@ -578,19 +578,21 @@ def sort(
return DataFrame(builder)

@DataframePublicAPI
def limit(self, num: int) -> "DataFrame":
def limit(self, num: int, eager: bool = False) -> "DataFrame":
"""Limits the rows in the DataFrame to the first ``N`` rows, similar to a SQL ``LIMIT``
Example:
>>> df_limited = df.limit(10) # returns 10 rows
Args:
num (int): maximum rows to allow.
eager (bool): whether to maximize for latency (time to first result) by eagerly executing
only one partition at a time, or throughput by executing multiple limits at a time
Returns:
DataFrame: Limited DataFrame
"""
builder = self._builder.limit(num)
builder = self._builder.limit(num, eager=eager)
return DataFrame(builder)

@DataframePublicAPI
Expand Down
7 changes: 7 additions & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def local_limit(
def global_limit(
child_plan: InProgressPhysicalPlan[PartitionT],
limit_rows: int,
eager: bool,
num_partitions: int,
) -> InProgressPhysicalPlan[PartitionT]:
"""Return the first n rows from the `child_plan`."""
Expand Down Expand Up @@ -342,6 +343,12 @@ def global_limit(
yield None
continue

# If running in eager mode, only allow one task in flight
if eager and len(materializations) > 0:
logger.debug(f"global_limit blocking on eager execution of: {materializations[0]}")
yield None
continue

# Execute a single child partition.
try:
child_step = child_plan.send(remaining_rows) if started else next(child_plan)
Expand Down
1 change: 1 addition & 0 deletions daft/execution/physical_plan_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) ->
return physical_plan.global_limit(
child_plan=child_plan,
limit_rows=node._num,
eager=node._eager,
num_partitions=node.num_partitions(),
)

Expand Down
2 changes: 1 addition & 1 deletion daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def filter(self, predicate: Expression) -> LogicalPlanBuilder:
pass

@abstractmethod
def limit(self, num_rows: int) -> LogicalPlanBuilder:
def limit(self, num_rows: int, eager: bool) -> LogicalPlanBuilder:
pass

@abstractmethod
Expand Down
11 changes: 6 additions & 5 deletions daft/logical/logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ def project(
def filter(self, predicate: Expression):
return Filter(self._plan, ExpressionsProjection([predicate])).to_builder()

def limit(self, num_rows: int) -> LogicalPlanBuilder:
def limit(self, num_rows: int, eager: bool) -> LogicalPlanBuilder:
local_limit = LocalLimit(self._plan, num=num_rows)
plan = GlobalLimit(local_limit, num=num_rows)
plan = GlobalLimit(local_limit, num=num_rows, eager=eager)
return plan.to_builder()

def explode(self, explode_expressions: list[Expression]) -> PyLogicalPlanBuilder:
Expand Down Expand Up @@ -828,17 +828,18 @@ def rebuild(self) -> LogicalPlan:


class GlobalLimit(UnaryNode):
def __init__(self, input: LogicalPlan, num: int) -> None:
def __init__(self, input: LogicalPlan, num: int, eager: bool) -> None:
super().__init__(input.schema(), partition_spec=input.partition_spec(), op_level=OpLevel.GLOBAL)
self._register_child(input)
self._num = num
self._eager = eager

def __repr__(self) -> str:
return self._repr_helper(num=self._num)

def copy_with_new_children(self, new_children: list[LogicalPlan]) -> LogicalPlan:
assert len(new_children) == 1
return GlobalLimit(new_children[0], self._num)
return GlobalLimit(new_children[0], self._num, self._eager)

def required_columns(self) -> list[set[str]]:
return [set()]
Expand All @@ -850,7 +851,7 @@ def _local_eq(self, other: Any) -> bool:
return isinstance(other, GlobalLimit) and self.schema() == other.schema() and self._num == other._num

def rebuild(self) -> LogicalPlan:
return GlobalLimit(input=self._children()[0].rebuild(), num=self._num)
return GlobalLimit(input=self._children()[0].rebuild(), num=self._num, eager=self._eager)


class LocalCount(UnaryNode):
Expand Down
2 changes: 1 addition & 1 deletion daft/logical/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def _push_down_global_limit_into_unary_node(self, parent: GlobalLimit, child: Un
"""
logger.debug(f"pushing {parent} into {child}")
grandchild = child._children()[0]
return child.copy_with_new_children([GlobalLimit(grandchild, num=parent._num)])
return child.copy_with_new_children([GlobalLimit(grandchild, num=parent._num, eager=parent._eager)])

@property
def _supported_unary_nodes(self) -> set[type[LogicalPlan]]:
Expand Down
4 changes: 2 additions & 2 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ def filter(self, predicate: Expression) -> RustLogicalPlanBuilder:
builder = self._builder.filter(predicate._expr)
return RustLogicalPlanBuilder(builder)

def limit(self, num_rows: int) -> RustLogicalPlanBuilder:
builder = self._builder.limit(num_rows)
def limit(self, num_rows: int, eager: bool) -> RustLogicalPlanBuilder:
builder = self._builder.limit(num_rows, eager)
return RustLogicalPlanBuilder(builder)

def explode(self, explode_expressions: list[Expression]) -> RustLogicalPlanBuilder:
Expand Down
9 changes: 5 additions & 4 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ impl LogicalPlanBuilder {
Ok(logical_plan.into())
}

pub fn limit(&self, limit: i64) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Limit::new(self.plan.clone(), limit).into();
pub fn limit(&self, limit: i64, eager: bool) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Limit::new(self.plan.clone(), limit, eager).into();
Ok(logical_plan.into())
}

Expand Down Expand Up @@ -317,8 +318,8 @@ impl PyLogicalPlanBuilder {
Ok(self.builder.filter(predicate.expr)?.into())
}

pub fn limit(&self, limit: i64) -> PyResult<Self> {
Ok(self.builder.limit(limit)?.into())
pub fn limit(&self, limit: i64, eager: bool) -> PyResult<Self> {
Ok(self.builder.limit(limit, eager)?.into())
}

pub fn explode(&self, to_explode: Vec<PyExpr>) -> PyResult<Self> {
Expand Down
11 changes: 9 additions & 2 deletions src/daft-plan/src/logical_ops/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ pub struct Limit {
pub input: Arc<LogicalPlan>,
// Limit on number of rows.
pub limit: i64,
// Whether to send tasks in waves (maximize throughput) or
// eagerly one-at-a-time (maximize time-to-first-result)
pub eager: bool,
}

impl Limit {
pub(crate) fn new(input: Arc<LogicalPlan>, limit: i64) -> Self {
Self { input, limit }
pub(crate) fn new(input: Arc<LogicalPlan>, limit: i64, eager: bool) -> Self {
Self {
input,
limit,
eager,
}
}
}
2 changes: 1 addition & 1 deletion src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl LogicalPlan {
input.clone(), projection.clone(), resource_request.clone(),
).unwrap()),
Self::Filter(Filter { predicate, .. }) => Self::Filter(Filter::try_new(input.clone(), predicate.clone()).unwrap()),
Self::Limit(Limit { limit, .. }) => Self::Limit(Limit::new(input.clone(), *limit)),
Self::Limit(Limit { limit, eager, .. }) => Self::Limit(Limit::new(input.clone(), *limit, *eager)),
Self::Explode(Explode { to_explode, .. }) => Self::Explode(Explode::try_new(input.clone(), to_explode.clone()).unwrap()),
Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()),
Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone())),
Expand Down
16 changes: 8 additions & 8 deletions src/daft-plan/src/optimization/rules/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl OptimizerRule for PushDownLimit {

fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
match plan.as_ref() {
LogicalPlan::Limit(LogicalLimit { input, limit }) => {
LogicalPlan::Limit(LogicalLimit { input, limit, .. }) => {
let limit = *limit as usize;
match input.as_ref() {
// Naive commuting with unary ops.
Expand Down Expand Up @@ -120,7 +120,7 @@ mod tests {
Field::new("a", DataType::Int64),
Field::new("b", DataType::Utf8),
])
.limit(5)?
.limit(5, false)?
.build();
let expected = "\
Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5";
Expand All @@ -140,7 +140,7 @@ mod tests {
],
Some(3),
)
.limit(5)?
.limit(5, false)?
.build();
let expected = "\
Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 3";
Expand All @@ -160,7 +160,7 @@ mod tests {
],
Some(10),
)
.limit(5)?
.limit(5, false)?
.build();
let expected = "\
Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5";
Expand All @@ -177,7 +177,7 @@ mod tests {
let py_obj = Python::with_gil(|py| py.None());
let schema: Arc<Schema> = Schema::new(vec![Field::new("a", DataType::Int64)])?.into();
let plan = LogicalPlanBuilder::in_memory_scan("foo", py_obj, schema, Default::default())?
.limit(5)?
.limit(5, false)?
.build();
let expected = "\
Limit: 5\
Expand All @@ -196,7 +196,7 @@ mod tests {
Field::new("b", DataType::Utf8),
])
.repartition(1, vec![col("a")], PartitionScheme::Hash)?
.limit(5)?
.limit(5, false)?
.build();
let expected = "\
Repartition: Scheme = Hash, Number of partitions = 1, Partition by = col(a)\
Expand All @@ -215,7 +215,7 @@ mod tests {
Field::new("b", DataType::Utf8),
])
.coalesce(1)?
.limit(5)?
.limit(5, false)?
.build();
let expected = "\
Coalesce: To = 1\
Expand All @@ -234,7 +234,7 @@ mod tests {
Field::new("b", DataType::Utf8),
])
.project(vec![col("a")], Default::default())?
.limit(5)?
.limit(5, false)?
.build();
let expected = "\
Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\
Expand Down
9 changes: 8 additions & 1 deletion src/daft-plan/src/physical_ops/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@ pub struct Limit {
// Upstream node.
pub input: Arc<PhysicalPlan>,
pub limit: i64,
pub eager: bool,
pub num_partitions: usize,
}

impl Limit {
pub(crate) fn new(input: Arc<PhysicalPlan>, limit: i64, num_partitions: usize) -> Self {
pub(crate) fn new(
input: Arc<PhysicalPlan>,
limit: i64,
eager: bool,
num_partitions: usize,
) -> Self {
Self {
input,
limit,
eager,
num_partitions,
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl PhysicalPlan {
PhysicalPlan::Limit(Limit {
input,
limit,
eager,
num_partitions,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
Expand All @@ -313,7 +314,7 @@ impl PhysicalPlan {
.call1((upstream_iter, *limit))?;
let global_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "global_limit"))?
.call1((local_limit_iter, *limit, *num_partitions))?;
.call1((local_limit_iter, *limit, *eager, *num_partitions))?;
Ok(global_limit_iter.into())
}
PhysicalPlan::Explode(Explode { input, to_explode }) => {
Expand Down
7 changes: 6 additions & 1 deletion src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,16 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult<PhysicalPlan> {
predicate.clone(),
)))
}
LogicalPlan::Limit(LogicalLimit { input, limit }) => {
LogicalPlan::Limit(LogicalLimit {
input,
limit,
eager,
}) => {
let input_physical = plan(input)?;
Ok(PhysicalPlan::Limit(Limit::new(
input_physical.into(),
*limit,
*eager,
logical_plan.partition_spec().num_partitions,
)))
}
Expand Down

0 comments on commit 7fa05b0

Please sign in to comment.