diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index 80a2fbbfcf..edfcd2b2ad 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -51,6 +51,7 @@ class PartitionTask(Generic[PartitionT]): instructions: list[Instruction] resource_request: ResourceRequest num_results: int + stage_id: int _id: int = field(default_factory=lambda: next(ID_GEN)) def id(self) -> str: @@ -110,7 +111,7 @@ def add_instruction( self.num_results = instruction.num_outputs() return self - def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[PartitionT]: + def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPartitionTask[PartitionT]: """Create a SingleOutputPartitionTask from this PartitionTaskBuilder. Returns a "frozen" version of this PartitionTask that cannot have instructions added. @@ -125,12 +126,13 @@ def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[Par return SingleOutputPartitionTask[PartitionT]( inputs=self.inputs, + stage_id=stage_id, instructions=self.instructions, num_results=1, resource_request=resource_request_final_cpu, ) - def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[PartitionT]: + def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]: """Create a MultiOutputPartitionTask from this PartitionTaskBuilder. Same as finalize_partition_task_single_output, except the output of this PartitionTask is a list of partitions. @@ -143,6 +145,7 @@ def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[Parti ) return MultiOutputPartitionTask[PartitionT]( inputs=self.inputs, + stage_id=stage_id, instructions=self.instructions, num_results=self.num_results, resource_request=resource_request_final_cpu, @@ -566,6 +569,11 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) ] +@dataclass(frozen=True) +class GlobalLimit(LocalLimit): + pass + + @dataclass(frozen=True) class MapPartition(SingleOutputInstruction): map_op: MapPartitionOp diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index cd3fa8430c..8d527ac6ee 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -53,6 +53,16 @@ MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]] +def _stage_id_counter(): + counter = 0 + while True: + counter += 1 + yield counter + + +stage_id_counter = _stage_id_counter() + + def partition_read( partitions: Iterator[PartitionT], metadatas: Iterator[PartialPartitionMetadata] | None = None ) -> InProgressPhysicalPlan[PartitionT]: @@ -81,6 +91,7 @@ def file_read( Yield a plan to read those filenames. """ materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id = next(stage_id_counter) output_partition_index = 0 while True: @@ -119,7 +130,7 @@ def file_read( try: child_step = next(child_plan) if isinstance(child_step, PartitionTaskBuilder): - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(child_step) yield child_step @@ -185,7 +196,7 @@ def join( # As the materializations complete, emit new steps to join each left and right partition. left_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque() right_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) yield_left = True while True: @@ -237,7 +248,7 @@ def join( try: step = next(next_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) next_requests.append(step) yield step @@ -246,9 +257,9 @@ def join( # Are we still waiting for materializations to complete? (We will emit more joins from them). if len(left_requests) + len(right_requests) > 0: logger.debug( - "join blocked on completion of sources.\n" - f"Left sources: {left_requests}\n" - f"Right sources: {right_requests}", + "join blocked on completion of sources.\n Left sources: %s\nRight sources: %s", + left_requests, + right_requests, ) yield None @@ -302,7 +313,7 @@ def global_limit( remaining_partitions = num_partitions materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) # To dynamically schedule the global limit, we need to apply an appropriate limit to each child partition. # We don't know their exact sizes since they are pending execution, so we will have to iteratively execute them, # count their rows, and then apply and update the remaining limit. @@ -317,16 +328,17 @@ def global_limit( # Apply and deduct the rolling global limit. while len(materializations) > 0 and materializations[0].done(): done_task = materializations.popleft() - - limit = remaining_rows and min(remaining_rows, done_task.partition_metadata().num_rows) + done_task_metadata = done_task.partition_metadata() + limit = remaining_rows and min(remaining_rows, done_task_metadata.num_rows) global_limit_step = PartitionTaskBuilder[PartitionT]( inputs=[done_task.partition()], - partial_metadatas=[done_task.partition_metadata()], - resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes), + partial_metadatas=[done_task_metadata], + resource_request=ResourceRequest(memory_bytes=done_task_metadata.size_bytes), ).add_instruction( - instruction=execution_step.LocalLimit(limit), + instruction=execution_step.GlobalLimit(limit), ) + yield global_limit_step remaining_partitions -= 1 remaining_rows -= limit @@ -346,7 +358,7 @@ def global_limit( partial_metadatas=[done_task.partition_metadata()], resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes), ).add_instruction( - instruction=execution_step.LocalLimit(0), + instruction=execution_step.GlobalLimit(0), ) for _ in range(remaining_partitions) ) @@ -376,10 +388,11 @@ def global_limit( if len(materializations) == 0 and remaining_rows > 0 and partial_meta.num_rows is not None: limit = min(remaining_rows, partial_meta.num_rows) child_step = child_step.add_instruction(instruction=execution_step.LocalLimit(limit)) + remaining_partitions -= 1 remaining_rows -= limit else: - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(child_step) yield child_step @@ -395,7 +408,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh """Wrap a plan that emits multi-output tasks to a plan that emits single-output tasks.""" materializations: deque[MultiOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: while len(materializations) > 0 and materializations[0].done(): done_task = materializations.popleft() @@ -409,7 +422,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh try: step = next(child_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_multi_output() + step = step.finalize_partition_task_multi_output(stage_id=stage_id) materializations.append(step) yield step @@ -436,10 +449,10 @@ def split( # Splitting evenly is fairly important if this operation is to be used for parallelism. # (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.) materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) for step in child_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(step) yield step @@ -503,7 +516,7 @@ def coalesce( merges_per_result = deque([stop - start for start, stop in zip(starts, stops)]) materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: # See if we can emit a coalesced partition. num_partitions_to_merge = merges_per_result[0] @@ -545,7 +558,7 @@ def coalesce( try: child_step = next(child_plan) if isinstance(child_step, PartitionTaskBuilder): - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id) materializations.append(child_step) yield child_step @@ -570,11 +583,12 @@ def reduce( """ materializations = list() + stage_id = next(stage_id_counter) # Dispatch all fanouts. for step in fanout_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_multi_output() + step = step.finalize_partition_task_multi_output(stage_id=stage_id) materializations.append(step) yield step @@ -611,14 +625,17 @@ def sort( # First, materialize the child plan. source_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id_children = next(stage_id_counter) for step in child_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id_children) source_materializations.append(step) yield step # Sample all partitions (to be used for calculating sort boundaries). sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id_sampling = next(stage_id_counter) + for source in source_materializations: while not source.done(): logger.debug("sort blocked on completion of source: %s", source) @@ -632,7 +649,7 @@ def sort( .add_instruction( instruction=execution_step.Sample(sort_by=sort_by), ) - .finalize_partition_task_single_output() + .finalize_partition_task_single_output(stage_id=stage_id_sampling) ) sample_materializations.append(sample) @@ -643,6 +660,8 @@ def sort( logger.debug("sort blocked on completion of all samples: %s", sample_materializations) yield None + stage_id_reduce = next(stage_id_counter) + # Reduce the samples to get sort boundaries. boundaries = ( PartitionTaskBuilder[PartitionT]( @@ -656,7 +675,7 @@ def sort( descending=descending, ), ) - .finalize_partition_task_single_output() + .finalize_partition_task_single_output(stage_id=stage_id_reduce) ) yield boundaries @@ -714,7 +733,7 @@ def materialize( """ materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: # Check if any inputs finished executing. while len(materializations) > 0 and materializations[0].done(): @@ -725,7 +744,7 @@ def materialize( try: step = next(child_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(step) assert isinstance(step, (PartitionTask, type(None))) diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 3769de2894..e5af9fa1f7 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -754,6 +754,9 @@ def resolve_schema(self, schema: Schema) -> Schema: fields = [e._to_field(schema) for e in self] return Schema._from_field_name_and_types([(f.name, f.dtype) for f in fields]) + def __repr__(self) -> str: + return f"{self._output_name_to_exprs.values()}" + class ExpressionImageNamespace(ExpressionNamespace): """Expression operations for image columns.""" diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py new file mode 100644 index 0000000000..3a6ff6210d --- /dev/null +++ b/daft/runners/progress_bar.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import os +from typing import Any + +from tqdm.auto import tqdm + +from daft.execution.execution_step import PartitionTask + + +class ProgressBar: + def __init__(self, use_ray_tqdm: bool, show_tasks_bar: bool = False, disable: bool = False) -> None: + self.use_ray_tqdm = use_ray_tqdm + self.show_tasks_bar = show_tasks_bar + self.tqdm_mod = tqdm + self.pbars: dict[int, tqdm] = dict() + self.disable = ( + disable + or not bool(int(os.environ.get("RAY_TQDM", "1"))) + or not bool(int(os.environ.get("DAFT_PROGRESS_BAR", "1"))) + ) + + def _make_new_bar(self, stage_id: int, name: str): + if self.use_ray_tqdm: + self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars)) + else: + self.pbars[stage_id] = self.tqdm_mod( + total=1, desc=name, position=len(self.pbars), leave=False, mininterval=1.0 + ) + + def mark_task_start(self, step: PartitionTask[Any]) -> None: + if self.disable: + return + if self.show_tasks_bar: + if len(self.pbars) == 0: + self._make_new_bar(-1, "Tasks") + else: + task_pbar = self.pbars[-1] + task_pbar.total += 1 + + stage_id = step.stage_id + + if stage_id not in self.pbars: + name = "-".join(i.__class__.__name__ for i in step.instructions) + self._make_new_bar(stage_id, name) + else: + pb = self.pbars[stage_id] + pb.total += 1 + + def mark_task_done(self, step: PartitionTask[Any]) -> None: + if self.disable: + return + + stage_id = step.stage_id + self.pbars[stage_id].update(1) + if self.show_tasks_bar: + self.pbars[-1].update(1) + + def close(self) -> None: + for p in self.pbars.values(): + p.close() + del p diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index ea818b6755..1e950ea767 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -4,7 +4,7 @@ import multiprocessing from concurrent import futures from dataclasses import dataclass -from typing import TYPE_CHECKING, Iterable, Iterator +from typing import Iterable, Iterator import psutil @@ -29,13 +29,10 @@ PartitionSet, ) from daft.runners.profiler import profiler +from daft.runners.progress_bar import ProgressBar from daft.runners.runner import Runner from daft.table import Table -if TYPE_CHECKING: - pass - - logger = logging.getLogger(__name__) @@ -149,7 +146,6 @@ def run_iter( } # Get executable tasks from planner. tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False) - with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"): partitions_gen = self._physical_plan_to_partitions(tasks) yield from partitions_gen @@ -162,6 +158,7 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP inflight_tasks_resources: dict[str, ResourceRequest] = dict() future_to_task: dict[futures.Future, str] = dict() + pbar = ProgressBar(use_ray_tqdm=False) with futures.ThreadPoolExecutor() as thread_pool: try: next_step = next(plan) @@ -207,11 +204,16 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP else: # Submit the task for execution. logger.debug("Submitting task for execution: %s", next_step) + + # update progress bar + pbar.mark_task_start(next_step) + future = thread_pool.submit( self.build_partitions, next_step.instructions, *next_step.inputs ) # Register the inflight task and resources used. future_to_task[future] = next_step.id() + inflight_tasks[next_step.id()] = next_step inflight_tasks_resources[next_step.id()] = next_step.resource_request @@ -228,6 +230,8 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP done_task = inflight_tasks.pop(done_id) partitions = done_future.result() + pbar.mark_task_done(done_task) + logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions)) done_task.set_result([PyMaterializedResult(partition) for partition in partitions]) @@ -235,6 +239,7 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP next_step = next(plan) except StopIteration: + pbar.close() return def _check_resource_requests(self, resource_request: ResourceRequest) -> None: diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 63632708a1..ccc7dd19a7 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -13,6 +13,7 @@ from daft.logical.builder import LogicalPlanBuilder from daft.plan_scheduler import PhysicalPlanScheduler +from daft.runners.progress_bar import ProgressBar logger = logging.getLogger(__name__) @@ -382,7 +383,7 @@ def _ray_num_cpus_provider(ttl_seconds: int = 1) -> Generator[int, None, None]: class Scheduler: - def __init__(self, max_task_backlog: int | None) -> None: + def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None: """ max_task_backlog: Max number of inflight tasks waiting for cores. """ @@ -403,6 +404,8 @@ def __init__(self, max_task_backlog: int | None) -> None: self.results_by_df: dict[str, Queue] = {} self.active_by_df: dict[str, bool] = dict() + self.use_ray_tqdm = use_ray_tqdm + def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: # Case: thread is terminated and no longer exists. # Should only be hit for repeated calls to next() after StopIteration. @@ -465,7 +468,7 @@ def _run_plan( inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict() inflight_ref_to_task: dict[ray.ObjectRef, str] = dict() - + pbar = ProgressBar(use_ray_tqdm=self.use_ray_tqdm) num_cpus_provider = _ray_num_cpus_provider() start = datetime.now() @@ -515,7 +518,9 @@ def _run_plan( # Dispatch the batch of tasks. logger.debug( - f"{(datetime.now() - start).total_seconds()}s: RayRunner dispatching {len(tasks_to_dispatch)} tasks:" + "%ss: RayRunner dispatching %s tasks", + (datetime.now() - start).total_seconds(), + len(tasks_to_dispatch), ) for task in tasks_to_dispatch: results = _build_partitions(task) @@ -524,6 +529,8 @@ def _run_plan( for result in results: inflight_ref_to_task[result] = task.id() + pbar.mark_task_start(task) + if dispatches_allowed == 0 or next_step is None: break @@ -562,10 +569,11 @@ def _run_plan( for partition in task.partitions(): del inflight_ref_to_task[partition] + pbar.mark_task_done(task) del inflight_tasks[task_id] logger.debug( - f"+{(datetime.now() - dispatch).total_seconds()}s to await results from {completed_task_ids}" + "%ss to await results from %s", (datetime.now() - dispatch).total_seconds(), completed_task_ids ) if next_step is None: @@ -577,8 +585,11 @@ def _run_plan( # Ensure that all Exceptions are correctly propagated to the consumer before reraising to kill thread except Exception as e: self.results_by_df[result_uuid].put(e) + pbar.close() raise + pbar.close() + @ray.remote(num_cpus=1) class SchedulerActor(Scheduler): @@ -628,12 +639,10 @@ def __init__( if isinstance(self.ray_context, ray.client_builder.ClientContext): # Run scheduler remotely if the cluster is connected remotely. self.scheduler_actor = SchedulerActor.remote( # type: ignore - max_task_backlog=max_task_backlog, + max_task_backlog=max_task_backlog, use_ray_tqdm=True ) else: - self.scheduler = Scheduler( - max_task_backlog=max_task_backlog, - ) + self.scheduler = Scheduler(max_task_backlog=max_task_backlog, use_ray_tqdm=False) def active_plans(self) -> list[str]: if isinstance(self.ray_context, ray.client_builder.ClientContext): diff --git a/pyproject.toml b/pyproject.toml index 5889a6fb40..f1bf3bfedc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "pyarrow >= 6.0.1", "fsspec[http]", "psutil", + "tqdm", "typing-extensions >= 4.0.0; python_version < '3.10'", "pickle5 >= 0.0.12; python_version < '3.8'" ]