Skip to content

Commit

Permalink
refactor to pbar
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Nov 17, 2023
1 parent 7ae3538 commit 8fcf00f
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 84 deletions.
63 changes: 63 additions & 0 deletions daft/runners/progress_bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

import os
from typing import Any

from tqdm.auto import tqdm

from daft.execution.execution_step import PartitionTask


class ProgressBar:
def __init__(self, use_ray_tqdm: bool, disable: bool = False) -> None:
print("use ray tqdm", use_ray_tqdm)
self.use_ray_tqdm = use_ray_tqdm
self.tqdm_mod = tqdm
self.pbars: dict[int, tqdm] = dict()
self.disable = (
disable
or not bool(int(os.environ.get("RAY_TQDM", "1")))
or not bool(int(os.environ.get("DAFT_PROGRESS_BAR", "1")))
)

def _make_new_bar(self, stage_id: int, name: str):
if self.use_ray_tqdm:
self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars))

Check warning on line 25 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L25

Added line #L25 was not covered by tests
else:
self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars), leave=False)

def mark_task_start(self, step: PartitionTask[Any]) -> None:
if self.disable:
return

Check warning on line 31 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L31

Added line #L31 was not covered by tests
if len(self.pbars) == 0:
self._make_new_bar(-1, "Tasks")
else:
task_pbar = self.pbars[-1]
task_pbar.total += 1
if self.use_ray_tqdm:
task_pbar.refresh()

Check warning on line 38 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L38

Added line #L38 was not covered by tests

stage_id = step.stage_id

if stage_id not in self.pbars:
name = "-".join(i.__class__.__name__ for i in step.instructions)
self._make_new_bar(stage_id, name)

else:
pb = self.pbars[stage_id]
pb.total += 1
if self.use_ray_tqdm:
pb.refresh()

Check warning on line 50 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L50

Added line #L50 was not covered by tests

def mark_task_done(self, step: PartitionTask[Any]) -> None:
if self.disable:
return

Check warning on line 54 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L54

Added line #L54 was not covered by tests

stage_id = step.stage_id
self.pbars[stage_id].update(1)
self.pbars[-1].update(1)

def close(self) -> None:
for p in self.pbars.values():
p.close()
del p
38 changes: 8 additions & 30 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Iterable, Iterator

import psutil
from tqdm.auto import tqdm

from daft.daft import (
FileFormatConfig,
Expand All @@ -30,6 +29,7 @@
PartitionSet,
)
from daft.runners.profiler import profiler
from daft.runners.progress_bar import ProgressBar
from daft.runners.runner import Runner
from daft.table import Table

Expand Down Expand Up @@ -114,9 +114,6 @@ def __init__(self, use_thread_pool: bool | None) -> None:
self.num_cpus = multiprocessing.cpu_count()
self.num_gpus = cuda_device_count()
self.bytes_memory = psutil.virtual_memory().total
from .tqdm import is_running_from_ipython

self.show_progress = is_running_from_ipython()

def runner_io(self) -> PyRunnerIO:
return PyRunnerIO()
Expand Down Expand Up @@ -161,7 +158,7 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
inflight_tasks_resources: dict[str, ResourceRequest] = dict()
future_to_task: dict[futures.Future, str] = dict()

pbars: dict[int, tqdm] = dict()
pbar = ProgressBar(use_ray_tqdm=False)
with futures.ThreadPoolExecutor() as thread_pool:
try:
next_step = next(plan)
Expand Down Expand Up @@ -206,26 +203,10 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP

else:
# Submit the task for execution.

logger.debug("Submitting task for execution: %s", next_step)
stage_id = next_step.stage_id

if self.show_progress:
if len(pbars) == 0:
pbars[-1] = tqdm(total=1, desc="Tasks", position=0)
else:
task_pbar = pbars[-1]
task_pbar.total += 1
# task_pbar.refresh()

if stage_id not in pbars:
name = "-".join(i.__class__.__name__ for i in next_step.instructions)
position = len(pbars)
pbars[stage_id] = tqdm(total=1, desc=name, position=position, leave=False)
else:
pb = pbars[stage_id]
pb.total += 1
# pb.refresh()

# update progress bar
pbar.mark_task_start(next_step)

future = thread_pool.submit(
self.build_partitions, next_step.instructions, *next_step.inputs
Expand All @@ -248,10 +229,8 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
del inflight_tasks_resources[done_id]
done_task = inflight_tasks.pop(done_id)
partitions = done_future.result()
if self.show_progress:
stage_id = done_task.stage_id
pbars[stage_id].update(1)
pbars[-1].update(1)

pbar.mark_task_done(done_task)

logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions))
done_task.set_result([PyMaterializedResult(partition) for partition in partitions])
Expand All @@ -260,8 +239,7 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
next_step = next(plan)

except StopIteration:
for p in pbars.values():
p.close()
pbar.close()
return

def _check_resource_requests(self, resource_request: ResourceRequest) -> None:
Expand Down
51 changes: 7 additions & 44 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from daft.logical.builder import LogicalPlanBuilder
from daft.plan_scheduler import PhysicalPlanScheduler
from daft.runners.progress_bar import ProgressBar

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,7 +60,6 @@
import pandas as pd
from ray.data.block import Block as RayDatasetBlock
from ray.data.dataset import Dataset as RayDataset
from tqdm import tqdm

_RAY_FROM_ARROW_REFS_AVAILABLE = True
try:
Expand Down Expand Up @@ -403,18 +403,7 @@ def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None:
self.results_by_df: dict[str, Queue] = {}
self.active_by_df: dict[str, bool] = dict()

from .tqdm import is_running_from_ipython

if use_ray_tqdm:
from ray.experimental import tqdm_ray

self.tqdm_builder = tqdm_ray
else:
from tqdm.auto import tqdm

self.tqdm_builder = tqdm

self.show_progress = is_running_from_ipython()
self.use_ray_tqdm = use_ray_tqdm

def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration:
# Case: thread is terminated and no longer exists.
Expand Down Expand Up @@ -478,7 +467,7 @@ def _run_plan(

inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict()
inflight_ref_to_task: dict[ray.ObjectRef, str] = dict()
pbars: dict[int, tqdm] = dict()
pbar = ProgressBar(use_ray_tqdm=self.use_ray_tqdm)
num_cpus_provider = _ray_num_cpus_provider()

start = datetime.now()
Expand Down Expand Up @@ -539,24 +528,7 @@ def _run_plan(
for result in results:
inflight_ref_to_task[result] = task.id()

if self.show_progress:
for task in tasks_to_dispatch:
if len(pbars) == 0:
pbars[-1] = self.tqdm_builder(total=1, desc="Tasks", position=0)
else:
task_pbar = pbars[-1]
task_pbar.total += 1
task_pbar.refresh()

stage_id = task.stage_id
if stage_id not in pbars:
name = "-".join(i.__class__.__name__ for i in task.instructions)
position = len(pbars)
pbars[stage_id] = self.tqdm_builder(total=1, desc=name, position=position)
else:
pb = pbars[stage_id]
pb.total += 1
pb.refresh()
pbar.mark_task_start(task)

if dispatches_allowed == 0 or next_step is None:
break
Expand Down Expand Up @@ -595,13 +567,8 @@ def _run_plan(
elif isinstance(task, MultiOutputPartitionTask):
for partition in task.partitions():
del inflight_ref_to_task[partition]
if self.show_progress:
stage_id = task.stage_id
pb = pbars[stage_id]
pb.update(1)
pb = pbars[-1]
pb.update(1)

pbar.mark_task_done(task)
del inflight_tasks[task_id]

logger.debug(
Expand All @@ -617,14 +584,10 @@ def _run_plan(
# Ensure that all Exceptions are correctly propagated to the consumer before reraising to kill thread
except Exception as e:
self.results_by_df[result_uuid].put(e)
for p in pbars.values():
p.close()
del pbars
pbar.close()

Check warning on line 587 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L587

Added line #L587 was not covered by tests
raise

for p in pbars.values():
p.close()
del pbars
pbar.close()


@ray.remote(num_cpus=1)
Expand Down
10 changes: 0 additions & 10 deletions daft/runners/tqdm.py

This file was deleted.

0 comments on commit 8fcf00f

Please sign in to comment.