Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Enable Progress Bars for PyRunner and RayRunner #1609

Merged
merged 13 commits into from
Nov 17, 2023
12 changes: 10 additions & 2 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class PartitionTask(Generic[PartitionT]):
instructions: list[Instruction]
resource_request: ResourceRequest
num_results: int
stage_id: int
_id: int = field(default_factory=lambda: next(ID_GEN))

def id(self) -> str:
Expand Down Expand Up @@ -110,7 +111,7 @@ def add_instruction(
self.num_results = instruction.num_outputs()
return self

def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[PartitionT]:
def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPartitionTask[PartitionT]:
"""Create a SingleOutputPartitionTask from this PartitionTaskBuilder.

Returns a "frozen" version of this PartitionTask that cannot have instructions added.
Expand All @@ -125,12 +126,13 @@ def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[Par

return SingleOutputPartitionTask[PartitionT](
inputs=self.inputs,
stage_id=stage_id,
instructions=self.instructions,
num_results=1,
resource_request=resource_request_final_cpu,
)

def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[PartitionT]:
def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]:
"""Create a MultiOutputPartitionTask from this PartitionTaskBuilder.

Same as finalize_partition_task_single_output, except the output of this PartitionTask is a list of partitions.
Expand All @@ -143,6 +145,7 @@ def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[Parti
)
return MultiOutputPartitionTask[PartitionT](
inputs=self.inputs,
stage_id=stage_id,
instructions=self.instructions,
num_results=self.num_results,
resource_request=resource_request_final_cpu,
Expand Down Expand Up @@ -566,6 +569,11 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
]


@dataclass(frozen=True)
class GlobalLimit(LocalLimit):
pass


@dataclass(frozen=True)
class MapPartition(SingleOutputInstruction):
map_op: MapPartitionOp
Expand Down
71 changes: 45 additions & 26 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@
MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]]


def _stage_id_counter():
counter = 0
while True:
counter += 1
yield counter


stage_id_counter = _stage_id_counter()


def partition_read(
partitions: Iterator[PartitionT], metadatas: Iterator[PartialPartitionMetadata] | None = None
) -> InProgressPhysicalPlan[PartitionT]:
Expand Down Expand Up @@ -81,6 +91,7 @@ def file_read(
Yield a plan to read those filenames.
"""
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
output_partition_index = 0

while True:
Expand Down Expand Up @@ -119,7 +130,7 @@ def file_read(
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

Expand Down Expand Up @@ -185,7 +196,7 @@ def join(
# As the materializations complete, emit new steps to join each left and right partition.
left_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()
right_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
yield_left = True

while True:
Expand Down Expand Up @@ -237,7 +248,7 @@ def join(
try:
step = next(next_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
next_requests.append(step)
yield step

Expand All @@ -246,9 +257,9 @@ def join(
# Are we still waiting for materializations to complete? (We will emit more joins from them).
if len(left_requests) + len(right_requests) > 0:
logger.debug(
"join blocked on completion of sources.\n"
f"Left sources: {left_requests}\n"
f"Right sources: {right_requests}",
"join blocked on completion of sources.\n Left sources: %s\nRight sources: %s",
left_requests,
right_requests,
)
yield None

Expand Down Expand Up @@ -302,7 +313,7 @@ def global_limit(
remaining_partitions = num_partitions

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
# To dynamically schedule the global limit, we need to apply an appropriate limit to each child partition.
# We don't know their exact sizes since they are pending execution, so we will have to iteratively execute them,
# count their rows, and then apply and update the remaining limit.
Expand All @@ -317,16 +328,17 @@ def global_limit(
# Apply and deduct the rolling global limit.
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()

limit = remaining_rows and min(remaining_rows, done_task.partition_metadata().num_rows)
done_task_metadata = done_task.partition_metadata()
limit = remaining_rows and min(remaining_rows, done_task_metadata.num_rows)

global_limit_step = PartitionTaskBuilder[PartitionT](
inputs=[done_task.partition()],
partial_metadatas=[done_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes),
partial_metadatas=[done_task_metadata],
resource_request=ResourceRequest(memory_bytes=done_task_metadata.size_bytes),
).add_instruction(
instruction=execution_step.LocalLimit(limit),
instruction=execution_step.GlobalLimit(limit),
)

yield global_limit_step
remaining_partitions -= 1
remaining_rows -= limit
Expand All @@ -346,7 +358,7 @@ def global_limit(
partial_metadatas=[done_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes),
).add_instruction(
instruction=execution_step.LocalLimit(0),
instruction=execution_step.GlobalLimit(0),
)
for _ in range(remaining_partitions)
)
Expand Down Expand Up @@ -376,10 +388,11 @@ def global_limit(
if len(materializations) == 0 and remaining_rows > 0 and partial_meta.num_rows is not None:
limit = min(remaining_rows, partial_meta.num_rows)
child_step = child_step.add_instruction(instruction=execution_step.LocalLimit(limit))

remaining_partitions -= 1
remaining_rows -= limit
else:
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

Expand All @@ -395,7 +408,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
"""Wrap a plan that emits multi-output tasks to a plan that emits single-output tasks."""

materializations: deque[MultiOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()
Expand All @@ -409,7 +422,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_multi_output()
step = step.finalize_partition_task_multi_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand All @@ -436,10 +449,10 @@ def split(
# Splitting evenly is fairly important if this operation is to be used for parallelism.
# (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.)
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand Down Expand Up @@ -503,7 +516,7 @@ def coalesce(
merges_per_result = deque([stop - start for start, stop in zip(starts, stops)])

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
# See if we can emit a coalesced partition.
num_partitions_to_merge = merges_per_result[0]
Expand Down Expand Up @@ -545,7 +558,7 @@ def coalesce(
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id)
materializations.append(child_step)
yield child_step

Expand All @@ -570,11 +583,12 @@ def reduce(
"""

materializations = list()
stage_id = next(stage_id_counter)

# Dispatch all fanouts.
for step in fanout_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_multi_output()
step = step.finalize_partition_task_multi_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand Down Expand Up @@ -611,14 +625,17 @@ def sort(

# First, materialize the child plan.
source_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id_children = next(stage_id_counter)
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id_children)
source_materializations.append(step)
yield step

# Sample all partitions (to be used for calculating sort boundaries).
sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id_sampling = next(stage_id_counter)

for source in source_materializations:
while not source.done():
logger.debug("sort blocked on completion of source: %s", source)
Expand All @@ -632,7 +649,7 @@ def sort(
.add_instruction(
instruction=execution_step.Sample(sort_by=sort_by),
)
.finalize_partition_task_single_output()
.finalize_partition_task_single_output(stage_id=stage_id_sampling)
)

sample_materializations.append(sample)
Expand All @@ -643,6 +660,8 @@ def sort(
logger.debug("sort blocked on completion of all samples: %s", sample_materializations)
yield None

stage_id_reduce = next(stage_id_counter)

# Reduce the samples to get sort boundaries.
boundaries = (
PartitionTaskBuilder[PartitionT](
Expand All @@ -656,7 +675,7 @@ def sort(
descending=descending,
),
)
.finalize_partition_task_single_output()
.finalize_partition_task_single_output(stage_id=stage_id_reduce)
)
yield boundaries

Expand Down Expand Up @@ -714,7 +733,7 @@ def materialize(
"""

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
# Check if any inputs finished executing.
while len(materializations) > 0 and materializations[0].done():
Expand All @@ -725,7 +744,7 @@ def materialize(
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
assert isinstance(step, (PartitionTask, type(None)))

Expand Down
3 changes: 3 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,9 @@
fields = [e._to_field(schema) for e in self]
return Schema._from_field_name_and_types([(f.name, f.dtype) for f in fields])

def __repr__(self) -> str:
return f"{self._output_name_to_exprs.values()}"

Check warning on line 758 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L758

Added line #L758 was not covered by tests


class ExpressionImageNamespace(ExpressionNamespace):
"""Expression operations for image columns."""
Expand Down
Loading
Loading