Skip to content

Commit

Permalink
[FEAT] Enable Progress Bars for PyRunner and RayRunner (#1609)
Browse files Browse the repository at this point in the history
* Enables progress bars when in interactive mode (Notebook or IPython)
* Implementation for Pyrunner, RayRunner[Local] and RayRunner[Remote]


![image](https://github.com/Eventual-Inc/Daft/assets/2550285/1b8eb1b3-c6e8-492c-9fc8-a23cc91f2260)

---------

Co-authored-by: Jay Chia <[email protected]>
  • Loading branch information
samster25 and jaychia authored Nov 17, 2023
1 parent 75521e2 commit 24c758e
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 42 deletions.
12 changes: 10 additions & 2 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class PartitionTask(Generic[PartitionT]):
instructions: list[Instruction]
resource_request: ResourceRequest
num_results: int
stage_id: int
_id: int = field(default_factory=lambda: next(ID_GEN))

def id(self) -> str:
Expand Down Expand Up @@ -110,7 +111,7 @@ def add_instruction(
self.num_results = instruction.num_outputs()
return self

def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[PartitionT]:
def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPartitionTask[PartitionT]:
"""Create a SingleOutputPartitionTask from this PartitionTaskBuilder.
Returns a "frozen" version of this PartitionTask that cannot have instructions added.
Expand All @@ -125,12 +126,13 @@ def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[Par

return SingleOutputPartitionTask[PartitionT](
inputs=self.inputs,
stage_id=stage_id,
instructions=self.instructions,
num_results=1,
resource_request=resource_request_final_cpu,
)

def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[PartitionT]:
def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]:
"""Create a MultiOutputPartitionTask from this PartitionTaskBuilder.
Same as finalize_partition_task_single_output, except the output of this PartitionTask is a list of partitions.
Expand All @@ -143,6 +145,7 @@ def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[Parti
)
return MultiOutputPartitionTask[PartitionT](
inputs=self.inputs,
stage_id=stage_id,
instructions=self.instructions,
num_results=self.num_results,
resource_request=resource_request_final_cpu,
Expand Down Expand Up @@ -566,6 +569,11 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
]


@dataclass(frozen=True)
class GlobalLimit(LocalLimit):
pass


@dataclass(frozen=True)
class MapPartition(SingleOutputInstruction):
map_op: MapPartitionOp
Expand Down
71 changes: 45 additions & 26 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@
MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]]


def _stage_id_counter():
counter = 0
while True:
counter += 1
yield counter


stage_id_counter = _stage_id_counter()


def partition_read(
partitions: Iterator[PartitionT], metadatas: Iterator[PartialPartitionMetadata] | None = None
) -> InProgressPhysicalPlan[PartitionT]:
Expand Down Expand Up @@ -81,6 +91,7 @@ def file_read(
Yield a plan to read those filenames.
"""
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
output_partition_index = 0

while True:
Expand Down Expand Up @@ -119,7 +130,7 @@ def file_read(
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

Expand Down Expand Up @@ -185,7 +196,7 @@ def join(
# As the materializations complete, emit new steps to join each left and right partition.
left_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()
right_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
yield_left = True

while True:
Expand Down Expand Up @@ -237,7 +248,7 @@ def join(
try:
step = next(next_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
next_requests.append(step)
yield step

Expand All @@ -246,9 +257,9 @@ def join(
# Are we still waiting for materializations to complete? (We will emit more joins from them).
if len(left_requests) + len(right_requests) > 0:
logger.debug(
"join blocked on completion of sources.\n"
f"Left sources: {left_requests}\n"
f"Right sources: {right_requests}",
"join blocked on completion of sources.\n Left sources: %s\nRight sources: %s",
left_requests,
right_requests,
)
yield None

Expand Down Expand Up @@ -302,7 +313,7 @@ def global_limit(
remaining_partitions = num_partitions

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
# To dynamically schedule the global limit, we need to apply an appropriate limit to each child partition.
# We don't know their exact sizes since they are pending execution, so we will have to iteratively execute them,
# count their rows, and then apply and update the remaining limit.
Expand All @@ -317,16 +328,17 @@ def global_limit(
# Apply and deduct the rolling global limit.
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()

limit = remaining_rows and min(remaining_rows, done_task.partition_metadata().num_rows)
done_task_metadata = done_task.partition_metadata()
limit = remaining_rows and min(remaining_rows, done_task_metadata.num_rows)

global_limit_step = PartitionTaskBuilder[PartitionT](
inputs=[done_task.partition()],
partial_metadatas=[done_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes),
partial_metadatas=[done_task_metadata],
resource_request=ResourceRequest(memory_bytes=done_task_metadata.size_bytes),
).add_instruction(
instruction=execution_step.LocalLimit(limit),
instruction=execution_step.GlobalLimit(limit),
)

yield global_limit_step
remaining_partitions -= 1
remaining_rows -= limit
Expand All @@ -346,7 +358,7 @@ def global_limit(
partial_metadatas=[done_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes),
).add_instruction(
instruction=execution_step.LocalLimit(0),
instruction=execution_step.GlobalLimit(0),
)
for _ in range(remaining_partitions)
)
Expand Down Expand Up @@ -376,10 +388,11 @@ def global_limit(
if len(materializations) == 0 and remaining_rows > 0 and partial_meta.num_rows is not None:
limit = min(remaining_rows, partial_meta.num_rows)
child_step = child_step.add_instruction(instruction=execution_step.LocalLimit(limit))

remaining_partitions -= 1
remaining_rows -= limit
else:
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

Expand All @@ -395,7 +408,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
"""Wrap a plan that emits multi-output tasks to a plan that emits single-output tasks."""

materializations: deque[MultiOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()
Expand All @@ -409,7 +422,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_multi_output()
step = step.finalize_partition_task_multi_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand All @@ -436,10 +449,10 @@ def split(
# Splitting evenly is fairly important if this operation is to be used for parallelism.
# (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.)
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand Down Expand Up @@ -503,7 +516,7 @@ def coalesce(
merges_per_result = deque([stop - start for start, stop in zip(starts, stops)])

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
# See if we can emit a coalesced partition.
num_partitions_to_merge = merges_per_result[0]
Expand Down Expand Up @@ -545,7 +558,7 @@ def coalesce(
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id)
materializations.append(child_step)
yield child_step

Expand All @@ -570,11 +583,12 @@ def reduce(
"""

materializations = list()
stage_id = next(stage_id_counter)

# Dispatch all fanouts.
for step in fanout_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_multi_output()
step = step.finalize_partition_task_multi_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand Down Expand Up @@ -611,14 +625,17 @@ def sort(

# First, materialize the child plan.
source_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id_children = next(stage_id_counter)
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id_children)
source_materializations.append(step)
yield step

# Sample all partitions (to be used for calculating sort boundaries).
sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id_sampling = next(stage_id_counter)

for source in source_materializations:
while not source.done():
logger.debug("sort blocked on completion of source: %s", source)
Expand All @@ -632,7 +649,7 @@ def sort(
.add_instruction(
instruction=execution_step.Sample(sort_by=sort_by),
)
.finalize_partition_task_single_output()
.finalize_partition_task_single_output(stage_id=stage_id_sampling)
)

sample_materializations.append(sample)
Expand All @@ -643,6 +660,8 @@ def sort(
logger.debug("sort blocked on completion of all samples: %s", sample_materializations)
yield None

stage_id_reduce = next(stage_id_counter)

# Reduce the samples to get sort boundaries.
boundaries = (
PartitionTaskBuilder[PartitionT](
Expand All @@ -656,7 +675,7 @@ def sort(
descending=descending,
),
)
.finalize_partition_task_single_output()
.finalize_partition_task_single_output(stage_id=stage_id_reduce)
)
yield boundaries

Expand Down Expand Up @@ -714,7 +733,7 @@ def materialize(
"""

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
# Check if any inputs finished executing.
while len(materializations) > 0 and materializations[0].done():
Expand All @@ -725,7 +744,7 @@ def materialize(
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
assert isinstance(step, (PartitionTask, type(None)))

Expand Down
3 changes: 3 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,9 @@ def resolve_schema(self, schema: Schema) -> Schema:
fields = [e._to_field(schema) for e in self]
return Schema._from_field_name_and_types([(f.name, f.dtype) for f in fields])

def __repr__(self) -> str:
return f"{self._output_name_to_exprs.values()}"


class ExpressionImageNamespace(ExpressionNamespace):
"""Expression operations for image columns."""
Expand Down
62 changes: 62 additions & 0 deletions daft/runners/progress_bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from __future__ import annotations

import os
from typing import Any

from tqdm.auto import tqdm

from daft.execution.execution_step import PartitionTask


class ProgressBar:
def __init__(self, use_ray_tqdm: bool, show_tasks_bar: bool = False, disable: bool = False) -> None:
self.use_ray_tqdm = use_ray_tqdm
self.show_tasks_bar = show_tasks_bar
self.tqdm_mod = tqdm
self.pbars: dict[int, tqdm] = dict()
self.disable = (
disable
or not bool(int(os.environ.get("RAY_TQDM", "1")))
or not bool(int(os.environ.get("DAFT_PROGRESS_BAR", "1")))
)

def _make_new_bar(self, stage_id: int, name: str):
if self.use_ray_tqdm:
self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars))
else:
self.pbars[stage_id] = self.tqdm_mod(
total=1, desc=name, position=len(self.pbars), leave=False, mininterval=1.0
)

def mark_task_start(self, step: PartitionTask[Any]) -> None:
if self.disable:
return
if self.show_tasks_bar:
if len(self.pbars) == 0:
self._make_new_bar(-1, "Tasks")
else:
task_pbar = self.pbars[-1]
task_pbar.total += 1

stage_id = step.stage_id

if stage_id not in self.pbars:
name = "-".join(i.__class__.__name__ for i in step.instructions)
self._make_new_bar(stage_id, name)
else:
pb = self.pbars[stage_id]
pb.total += 1

def mark_task_done(self, step: PartitionTask[Any]) -> None:
if self.disable:
return

stage_id = step.stage_id
self.pbars[stage_id].update(1)
if self.show_tasks_bar:
self.pbars[-1].update(1)

def close(self) -> None:
for p in self.pbars.values():
p.close()
del p
Loading

0 comments on commit 24c758e

Please sign in to comment.