From 005500a9a8a4f14352ea2f79bb323390916b4302 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 14 Nov 2023 16:02:22 -0800 Subject: [PATCH] sammy/progress-bar-prototype-pyrunner --- daft/execution/execution_step.py | 12 +++++- daft/execution/physical_plan.py | 65 +++++++++++++++++++++----------- daft/expressions/expressions.py | 3 ++ daft/runners/pyrunner.py | 28 ++++++++++---- 4 files changed, 76 insertions(+), 32 deletions(-) diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index 80a2fbbfcf..edfcd2b2ad 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -51,6 +51,7 @@ class PartitionTask(Generic[PartitionT]): instructions: list[Instruction] resource_request: ResourceRequest num_results: int + stage_id: int _id: int = field(default_factory=lambda: next(ID_GEN)) def id(self) -> str: @@ -110,7 +111,7 @@ def add_instruction( self.num_results = instruction.num_outputs() return self - def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[PartitionT]: + def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPartitionTask[PartitionT]: """Create a SingleOutputPartitionTask from this PartitionTaskBuilder. Returns a "frozen" version of this PartitionTask that cannot have instructions added. @@ -125,12 +126,13 @@ def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[Par return SingleOutputPartitionTask[PartitionT]( inputs=self.inputs, + stage_id=stage_id, instructions=self.instructions, num_results=1, resource_request=resource_request_final_cpu, ) - def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[PartitionT]: + def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]: """Create a MultiOutputPartitionTask from this PartitionTaskBuilder. Same as finalize_partition_task_single_output, except the output of this PartitionTask is a list of partitions. @@ -143,6 +145,7 @@ def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[Parti ) return MultiOutputPartitionTask[PartitionT]( inputs=self.inputs, + stage_id=stage_id, instructions=self.instructions, num_results=self.num_results, resource_request=resource_request_final_cpu, @@ -566,6 +569,11 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) ] +@dataclass(frozen=True) +class GlobalLimit(LocalLimit): + pass + + @dataclass(frozen=True) class MapPartition(SingleOutputInstruction): map_op: MapPartitionOp diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index df0b373633..ae9b7460d5 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -53,6 +53,16 @@ MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]] +def _stage_id_counter(): + counter = 0 + while True: + counter += 1 + yield counter + + +stage_id_counter = _stage_id_counter() + + def partition_read( partitions: Iterator[PartitionT], metadatas: Iterator[PartialPartitionMetadata] | None = None ) -> InProgressPhysicalPlan[PartitionT]: @@ -81,6 +91,7 @@ def file_read( Yield a plan to read those filenames. """ materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id = next(stage_id_counter) output_partition_index = 0 while True: @@ -119,7 +130,7 @@ def file_read( try: child_step = next(child_plan) if isinstance(child_step, PartitionTaskBuilder): - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(child_step) yield child_step @@ -185,7 +196,7 @@ def join( # As the materializations complete, emit new steps to join each left and right partition. left_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque() right_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) yield_left = True while True: @@ -237,7 +248,7 @@ def join( try: step = next(next_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) next_requests.append(step) yield step @@ -302,7 +313,7 @@ def global_limit( remaining_partitions = num_partitions materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) # To dynamically schedule the global limit, we need to apply an appropriate limit to each child partition. # We don't know their exact sizes since they are pending execution, so we will have to iteratively execute them, # count their rows, and then apply and update the remaining limit. @@ -317,16 +328,17 @@ def global_limit( # Apply and deduct the rolling global limit. while len(materializations) > 0 and materializations[0].done(): done_task = materializations.popleft() - - limit = remaining_rows and min(remaining_rows, done_task.partition_metadata().num_rows) + done_task_metadata = done_task.partition_metadata() + limit = remaining_rows and min(remaining_rows, done_task_metadata.num_rows) global_limit_step = PartitionTaskBuilder[PartitionT]( inputs=[done_task.partition()], - partial_metadatas=[done_task.partition_metadata()], - resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes), + partial_metadatas=[done_task_metadata], + resource_request=ResourceRequest(memory_bytes=done_task_metadata.size_bytes), ).add_instruction( - instruction=execution_step.LocalLimit(limit), + instruction=execution_step.GlobalLimit(limit), ) + yield global_limit_step remaining_partitions -= 1 remaining_rows -= limit @@ -346,7 +358,7 @@ def global_limit( partial_metadatas=[done_task.partition_metadata()], resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes), ).add_instruction( - instruction=execution_step.LocalLimit(0), + instruction=execution_step.GlobalLimit(0), ) for _ in range(remaining_partitions) ) @@ -376,10 +388,11 @@ def global_limit( if len(materializations) == 0 and remaining_rows > 0 and partial_meta.num_rows is not None: limit = min(remaining_rows, partial_meta.num_rows) child_step = child_step.add_instruction(instruction=execution_step.LocalLimit(limit)) + remaining_partitions -= 1 remaining_rows -= limit else: - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(child_step) yield child_step @@ -395,7 +408,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh """Wrap a plan that emits multi-output tasks to a plan that emits single-output tasks.""" materializations: deque[MultiOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: while len(materializations) > 0 and materializations[0].done(): done_task = materializations.popleft() @@ -409,7 +422,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh try: step = next(child_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_multi_output() + step = step.finalize_partition_task_multi_output(stage_id=stage_id) materializations.append(step) yield step @@ -436,10 +449,10 @@ def split( # Splitting evenly is fairly important if this operation is to be used for parallelism. # (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.) materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) for step in child_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(step) yield step @@ -503,7 +516,7 @@ def coalesce( merges_per_result = deque([stop - start for start, stop in zip(starts, stops)]) materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: # See if we can emit a coalesced partition. num_partitions_to_merge = merges_per_result[0] @@ -545,7 +558,7 @@ def coalesce( try: child_step = next(child_plan) if isinstance(child_step, PartitionTaskBuilder): - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id) materializations.append(child_step) yield child_step @@ -570,11 +583,12 @@ def reduce( """ materializations = list() + stage_id = next(stage_id_counter) # Dispatch all fanouts. for step in fanout_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_multi_output() + step = step.finalize_partition_task_multi_output(stage_id=stage_id) materializations.append(step) yield step @@ -611,14 +625,17 @@ def sort( # First, materialize the child plan. source_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id_children = next(stage_id_counter) for step in child_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id_children) source_materializations.append(step) yield step # Sample all partitions (to be used for calculating sort boundaries). sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id_sampling = next(stage_id_counter) + for source in source_materializations: while not source.done(): logger.debug(f"sort blocked on completion of source: {source}") @@ -632,7 +649,7 @@ def sort( .add_instruction( instruction=execution_step.Sample(sort_by=sort_by), ) - .finalize_partition_task_single_output() + .finalize_partition_task_single_output(stage_id=stage_id_sampling) ) sample_materializations.append(sample) @@ -643,6 +660,8 @@ def sort( logger.debug(f"sort blocked on completion of all samples: {sample_materializations}") yield None + stage_id_reduce = next(stage_id_counter) + # Reduce the samples to get sort boundaries. boundaries = ( PartitionTaskBuilder[PartitionT]( @@ -656,7 +675,7 @@ def sort( descending=descending, ), ) - .finalize_partition_task_single_output() + .finalize_partition_task_single_output(stage_id=stage_id_reduce) ) yield boundaries @@ -714,7 +733,7 @@ def materialize( """ materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: # Check if any inputs finished executing. while len(materializations) > 0 and materializations[0].done(): @@ -725,7 +744,7 @@ def materialize( try: step = next(child_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(step) assert isinstance(step, (PartitionTask, type(None))) diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 3769de2894..e5af9fa1f7 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -754,6 +754,9 @@ def resolve_schema(self, schema: Schema) -> Schema: fields = [e._to_field(schema) for e in self] return Schema._from_field_name_and_types([(f.name, f.dtype) for f in fields]) + def __repr__(self) -> str: + return f"{self._output_name_to_exprs.values()}" + class ExpressionImageNamespace(ExpressionNamespace): """Expression operations for image columns.""" diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index dd02572e11..7d1f7393e5 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -4,9 +4,10 @@ import multiprocessing from concurrent import futures from dataclasses import dataclass -from typing import TYPE_CHECKING, Iterable, Iterator +from typing import Iterable, Iterator import psutil +from tqdm.auto import tqdm from daft.daft import ( FileFormatConfig, @@ -32,10 +33,6 @@ from daft.runners.runner import Runner from daft.table import Table -if TYPE_CHECKING: - pass - - logger = logging.getLogger(__name__) @@ -149,7 +146,6 @@ def run_iter( } # Get executable tasks from planner. tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False) - with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"): partitions_gen = self._physical_plan_to_partitions(tasks) yield from partitions_gen @@ -162,6 +158,9 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP inflight_tasks_resources: dict[str, ResourceRequest] = dict() future_to_task: dict[futures.Future, str] = dict() + pbars = dict() + # tqdm.set_lock(TRLock()) + # initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),) with futures.ThreadPoolExecutor() as thread_pool: try: next_step = next(plan) @@ -206,12 +205,24 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP else: # Submit the task for execution. + logger.debug(f"Submitting task for execution: {next_step}") + stage_id = next_step.stage_id + if stage_id not in pbars: + name = "-".join(i.__class__.__name__ for i in next_step.instructions) + pbars[stage_id] = tqdm(total=float("inf"), desc=name) + pb = pbars[stage_id] + if pb.total is None: + pb.total = 1 + else: + pb.total += 1 + pb.refresh() future = thread_pool.submit( self.build_partitions, next_step.instructions, *next_step.inputs ) # Register the inflight task and resources used. future_to_task[future] = next_step.id() + inflight_tasks[next_step.id()] = next_step inflight_tasks_resources[next_step.id()] = next_step.resource_request @@ -226,8 +237,9 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP done_id = future_to_task.pop(done_future) del inflight_tasks_resources[done_id] done_task = inflight_tasks.pop(done_id) + stage_id = done_task.stage_id partitions = done_future.result() - + pbars[stage_id].update(1) logger.debug(f"Task completed: {done_id} -> <{len(partitions)} partitions>") done_task.set_result([PyMaterializedResult(partition) for partition in partitions]) @@ -235,6 +247,8 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP next_step = next(plan) except StopIteration: + for p in pbars.values(): + p.close() return def _check_resource_requests(self, resource_request: ResourceRequest) -> None: