Skip to content

Commit

Permalink
[PERF] Add "eager mode" to limits and use in .show() (#1498)
Browse files Browse the repository at this point in the history
1. Adds "eager mode" to limits (when enabled, the global limit physical
plan will only yield one in-flight partition at a time instead of
allowing for batched execution)
2. Changes our `limit_pushdown` rule to keep the logical limit node
around after performing limit pushdowns into the Source node:
* This is needed because the logical limit node will be translated into
a Physical global limit
* Refactored some of the changes introduced in #1476 : translation of
Source nodes into the physical plan no longer "implicitly" creates a
physical limit node
3. Use an iterator of Tables in `.show()` instead of relying on
`.collect()`
4. Adds a max buffer size to the results buffer in our RayRunner - this
is so that we can bound the number of results to be buffered to `1` and
prevent runaway asynchronous execution by the RayRunner which would
otherwise run to completion


This PR significantly speeds up show because:

1. Not all partitions need to be materialized (since we don't run a
`.collect()`)
2. "Eager" mode results in fewer wasted read tasks (and also gets rid of
our `TaskCancelledException` messages)
3. The RayRunner no longer executes all the way to completion, and is
instead bounded by a prefetching max size of 1

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Oct 18, 2023
1 parent bdd2128 commit 76e256a
Show file tree
Hide file tree
Showing 18 changed files with 167 additions and 128 deletions.
2 changes: 1 addition & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ class LogicalPlanBuilder:
) -> LogicalPlanBuilder: ...
def project(self, projection: list[PyExpr], resource_request: ResourceRequest) -> LogicalPlanBuilder: ...
def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ...
def limit(self, limit: int) -> LogicalPlanBuilder: ...
def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ...
def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ...
def repartition(
Expand Down
29 changes: 18 additions & 11 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,24 @@ def show(self, n: int = 8) -> "DataFrameDisplay":
Returns:
DataFrameDisplay: object that has a rich tabular display
"""
df = self
df = df.limit(n)
df.collect(num_preview_rows=None)
collected_preview = df._preview
assert collected_preview is not None

builder = self._builder.limit(n, eager=True)

# Iteratively retrieve partitions until enough data has been materialized
tables = []
seen = 0
for table in get_context().runner().run_iter_tables(builder, results_buffer_size=1):
tables.append(table)
seen += len(table)
if seen >= n:
break

preview_partition = Table.concat(tables)
preview_partition = preview_partition if len(preview_partition) <= n else preview_partition.slice(0, n)
preview = DataFramePreview(
preview_partition=collected_preview.preview_partition,
# Override dataframe_num_rows=None, because we do not know
# the size of the entire (un-limited) dataframe when showing
preview_partition=preview_partition,
# We do not know the size of the entire (un-limited) dataframe when showing
dataframe_num_rows=None,
)

return DataFrameDisplay(preview, self.schema(), num_rows=n)

@DataframePublicAPI
Expand Down Expand Up @@ -586,11 +591,13 @@ def limit(self, num: int) -> "DataFrame":
Args:
num (int): maximum rows to allow.
eager (bool): whether to maximize for latency (time to first result) by eagerly executing
only one partition at a time, or throughput by executing multiple limits at a time
Returns:
DataFrame: Limited DataFrame
"""
builder = self._builder.limit(num)
builder = self._builder.limit(num, eager=False)
return DataFrame(builder)

@DataframePublicAPI
Expand Down
7 changes: 7 additions & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def local_limit(
def global_limit(
child_plan: InProgressPhysicalPlan[PartitionT],
limit_rows: int,
eager: bool,
num_partitions: int,
) -> InProgressPhysicalPlan[PartitionT]:
"""Return the first n rows from the `child_plan`."""
Expand Down Expand Up @@ -342,6 +343,12 @@ def global_limit(
yield None
continue

# If running in eager mode, only allow one task in flight
if eager and len(materializations) > 0:
logger.debug(f"global_limit blocking on eager execution of: {materializations[0]}")
yield None
continue

# Execute a single child partition.
try:
child_step = child_plan.send(remaining_rows) if started else next(child_plan)
Expand Down
1 change: 1 addition & 0 deletions daft/execution/physical_plan_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) ->
return physical_plan.global_limit(
child_plan=child_plan,
limit_rows=node._num,
eager=node._eager,
num_partitions=node.num_partitions(),
)

Expand Down
2 changes: 1 addition & 1 deletion daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def filter(self, predicate: Expression) -> LogicalPlanBuilder:
pass

@abstractmethod
def limit(self, num_rows: int) -> LogicalPlanBuilder:
def limit(self, num_rows: int, eager: bool) -> LogicalPlanBuilder:
pass

@abstractmethod
Expand Down
11 changes: 6 additions & 5 deletions daft/logical/logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ def project(
def filter(self, predicate: Expression):
return Filter(self._plan, ExpressionsProjection([predicate])).to_builder()

def limit(self, num_rows: int) -> LogicalPlanBuilder:
def limit(self, num_rows: int, eager: bool) -> LogicalPlanBuilder:
local_limit = LocalLimit(self._plan, num=num_rows)
plan = GlobalLimit(local_limit, num=num_rows)
plan = GlobalLimit(local_limit, num=num_rows, eager=eager)
return plan.to_builder()

def explode(self, explode_expressions: list[Expression]) -> PyLogicalPlanBuilder:
Expand Down Expand Up @@ -828,17 +828,18 @@ def rebuild(self) -> LogicalPlan:


class GlobalLimit(UnaryNode):
def __init__(self, input: LogicalPlan, num: int) -> None:
def __init__(self, input: LogicalPlan, num: int, eager: bool) -> None:
super().__init__(input.schema(), partition_spec=input.partition_spec(), op_level=OpLevel.GLOBAL)
self._register_child(input)
self._num = num
self._eager = eager

def __repr__(self) -> str:
return self._repr_helper(num=self._num)

def copy_with_new_children(self, new_children: list[LogicalPlan]) -> LogicalPlan:
assert len(new_children) == 1
return GlobalLimit(new_children[0], self._num)
return GlobalLimit(new_children[0], self._num, self._eager)

def required_columns(self) -> list[set[str]]:
return [set()]
Expand All @@ -850,7 +851,7 @@ def _local_eq(self, other: Any) -> bool:
return isinstance(other, GlobalLimit) and self.schema() == other.schema() and self._num == other._num

def rebuild(self) -> LogicalPlan:
return GlobalLimit(input=self._children()[0].rebuild(), num=self._num)
return GlobalLimit(input=self._children()[0].rebuild(), num=self._num, eager=self._eager)


class LocalCount(UnaryNode):
Expand Down
2 changes: 1 addition & 1 deletion daft/logical/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def _push_down_global_limit_into_unary_node(self, parent: GlobalLimit, child: Un
"""
logger.debug(f"pushing {parent} into {child}")
grandchild = child._children()[0]
return child.copy_with_new_children([GlobalLimit(grandchild, num=parent._num)])
return child.copy_with_new_children([GlobalLimit(grandchild, num=parent._num, eager=parent._eager)])

@property
def _supported_unary_nodes(self) -> set[type[LogicalPlan]]:
Expand Down
4 changes: 2 additions & 2 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ def filter(self, predicate: Expression) -> RustLogicalPlanBuilder:
builder = self._builder.filter(predicate._expr)
return RustLogicalPlanBuilder(builder)

def limit(self, num_rows: int) -> RustLogicalPlanBuilder:
builder = self._builder.limit(num_rows)
def limit(self, num_rows: int, eager: bool) -> RustLogicalPlanBuilder:
builder = self._builder.limit(num_rows, eager)
return RustLogicalPlanBuilder(builder)

def explode(self, explode_expressions: list[Expression]) -> RustLogicalPlanBuilder:
Expand Down
11 changes: 8 additions & 3 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
pset_entry = self.put_partition_set_into_cache(result_pset)
return pset_entry

def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[Table]:
def run_iter(
self,
builder: LogicalPlanBuilder,
# NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0
results_buffer_size: int | None = None,
) -> Iterator[Table]:
# Optimize the logical plan.
builder = builder.optimize()
# Finalize the logical plan and get a physical plan scheduler for translating the
Expand All @@ -157,8 +162,8 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[Table]:
partitions_gen = self._physical_plan_to_partitions(tasks)
yield from partitions_gen

def run_iter_tables(self, builder: LogicalPlanBuilder) -> Iterator[Table]:
return self.run_iter(builder)
def run_iter_tables(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[Table]:
return self.run_iter(builder, results_buffer_size=results_buffer_size)

def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalPlan) -> Iterator[Table]:
inflight_tasks: dict[str, PartitionTask] = dict()
Expand Down
14 changes: 9 additions & 5 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import threading
import time
import uuid
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from queue import Queue
Expand Down Expand Up @@ -412,7 +411,7 @@ def __init__(self, max_task_backlog: int | None) -> None:
self.reserved_cores = 0

self.threads_by_df: dict[str, threading.Thread] = dict()
self.results_by_df: dict[str, Queue] = defaultdict(Queue)
self.results_by_df: dict[str, Queue] = {}

def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration:
# Case: thread is terminated and no longer exists.
Expand All @@ -435,7 +434,10 @@ def run_plan(
plan_scheduler: PhysicalPlanScheduler,
psets: dict[str, ray.ObjectRef],
result_uuid: str,
results_buffer_size: int | None = None,
) -> None:
self.results_by_df[result_uuid] = Queue(maxsize=results_buffer_size or -1)

t = threading.Thread(
target=self._run_plan,
name=result_uuid,
Expand Down Expand Up @@ -624,7 +626,7 @@ def __init__(
max_task_backlog=max_task_backlog,
)

def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[ray.ObjectRef]:
def run_iter(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[ray.ObjectRef]:
# Optimize the logical plan.
builder = builder.optimize()
# Finalize the logical plan and get a physical plan scheduler for translating the
Expand All @@ -643,6 +645,7 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[ray.ObjectRef]:
plan_scheduler=plan_scheduler,
psets=psets,
result_uuid=result_uuid,
results_buffer_size=results_buffer_size,
)
)

Expand All @@ -651,6 +654,7 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[ray.ObjectRef]:
plan_scheduler=plan_scheduler,
psets=psets,
result_uuid=result_uuid,
results_buffer_size=results_buffer_size,
)

while True:
Expand All @@ -663,8 +667,8 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[ray.ObjectRef]:
return
yield result

def run_iter_tables(self, builder: LogicalPlanBuilder) -> Iterator[Table]:
for ref in self.run_iter(builder):
def run_iter_tables(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[Table]:
for ref in self.run_iter(builder, results_buffer_size=results_buffer_size):
yield ray.get(ref)

def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
Expand Down
20 changes: 16 additions & 4 deletions daft/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,23 @@ def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
...

@abstractmethod
def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[PartitionT]:
"""Similar to run(), but yield the individual partitions as they are completed."""
def run_iter(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[PartitionT]:
"""Similar to run(), but yield the individual partitions as they are completed.
Args:
builder: the builder for the LogicalPlan that is to be executed
results_buffer_size: if the plan is executed asynchronously, this is the maximum size of the number of results
that can be buffered before execution should pause and wait.
"""
...

@abstractmethod
def run_iter_tables(self, builder: LogicalPlanBuilder) -> Iterator[Table]:
"""Similar to run_iter(), but always dereference and yield Table objects."""
def run_iter_tables(self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None) -> Iterator[Table]:
"""Similar to run_iter(), but always dereference and yield Table objects.
Args:
builder: the builder for the LogicalPlan that is to be executed
results_buffer_size: if the plan is executed asynchronously, this is the maximum size of the number of results
that can be buffered before execution should pause and wait.
"""
...
9 changes: 5 additions & 4 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ impl LogicalPlanBuilder {
Ok(logical_plan.into())
}

pub fn limit(&self, limit: i64) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Limit::new(self.plan.clone(), limit).into();
pub fn limit(&self, limit: i64, eager: bool) -> DaftResult<Self> {
let logical_plan: LogicalPlan =
logical_ops::Limit::new(self.plan.clone(), limit, eager).into();
Ok(logical_plan.into())
}

Expand Down Expand Up @@ -317,8 +318,8 @@ impl PyLogicalPlanBuilder {
Ok(self.builder.filter(predicate.expr)?.into())
}

pub fn limit(&self, limit: i64) -> PyResult<Self> {
Ok(self.builder.limit(limit)?.into())
pub fn limit(&self, limit: i64, eager: bool) -> PyResult<Self> {
Ok(self.builder.limit(limit, eager)?.into())
}

pub fn explode(&self, to_explode: Vec<PyExpr>) -> PyResult<Self> {
Expand Down
11 changes: 9 additions & 2 deletions src/daft-plan/src/logical_ops/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ pub struct Limit {
pub input: Arc<LogicalPlan>,
// Limit on number of rows.
pub limit: i64,
// Whether to send tasks in waves (maximize throughput) or
// eagerly one-at-a-time (maximize time-to-first-result)
pub eager: bool,
}

impl Limit {
pub(crate) fn new(input: Arc<LogicalPlan>, limit: i64) -> Self {
Self { input, limit }
pub(crate) fn new(input: Arc<LogicalPlan>, limit: i64, eager: bool) -> Self {
Self {
input,
limit,
eager,
}
}
}
2 changes: 1 addition & 1 deletion src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl LogicalPlan {
input.clone(), projection.clone(), resource_request.clone(),
).unwrap()),
Self::Filter(Filter { predicate, .. }) => Self::Filter(Filter::try_new(input.clone(), predicate.clone()).unwrap()),
Self::Limit(Limit { limit, .. }) => Self::Limit(Limit::new(input.clone(), *limit)),
Self::Limit(Limit { limit, eager, .. }) => Self::Limit(Limit::new(input.clone(), *limit, *eager)),
Self::Explode(Explode { to_explode, .. }) => Self::Explode(Explode::try_new(input.clone(), to_explode.clone()).unwrap()),
Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()),
Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone())),
Expand Down
Loading

0 comments on commit 76e256a

Please sign in to comment.