Skip to content

Commit

Permalink
add tasks to top level
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Nov 15, 2023
1 parent f261cd5 commit 187dabf
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
13 changes: 11 additions & 2 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,21 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
stage_id = next_step.stage_id

if self.show_progress:
if len(pbars) == 0:
pbars[-1] = tqdm(total=1, desc="Tasks", position=0)

Check warning on line 215 in daft/runners/pyrunner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/pyrunner.py#L214-L215

Added lines #L214 - L215 were not covered by tests
else:
task_pbar = pbars[-1]
task_pbar.total += 1

Check warning on line 218 in daft/runners/pyrunner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/pyrunner.py#L217-L218

Added lines #L217 - L218 were not covered by tests
# task_pbar.refresh()

if stage_id not in pbars:
name = "-".join(i.__class__.__name__ for i in next_step.instructions)
position = len(pbars)
pbars[stage_id] = tqdm(total=1, desc=name, position=position)
pbars[stage_id] = tqdm(total=1, desc=name, position=position, leave=False)

Check warning on line 224 in daft/runners/pyrunner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/pyrunner.py#L221-L224

Added lines #L221 - L224 were not covered by tests
else:
pb = pbars[stage_id]
pb.total += 1

Check warning on line 227 in daft/runners/pyrunner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/pyrunner.py#L226-L227

Added lines #L226 - L227 were not covered by tests
pb.refresh()
# pb.refresh()

future = thread_pool.submit(
self.build_partitions, next_step.instructions, *next_step.inputs
Expand All @@ -244,6 +251,8 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
if self.show_progress:
stage_id = done_task.stage_id
pbars[stage_id].update(1)
pbars[-1].update(1)

Check warning on line 254 in daft/runners/pyrunner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/pyrunner.py#L252-L254

Added lines #L252 - L254 were not covered by tests

logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions))
done_task.set_result([PyMaterializedResult(partition) for partition in partitions])

Expand Down
9 changes: 9 additions & 0 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ def _run_plan(

if self.show_progress:
for task in tasks_to_dispatch:
if len(pbars) == 0:
pbars[-1] = self.tqdm_builder(total=1, desc="Tasks", position=0)

Check warning on line 545 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L543-L545

Added lines #L543 - L545 were not covered by tests
else:
task_pbar = pbars[-1]
task_pbar.total += 1
task_pbar.refresh()

Check warning on line 549 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L547-L549

Added lines #L547 - L549 were not covered by tests

stage_id = task.stage_id
if stage_id not in pbars:
name = "-".join(i.__class__.__name__ for i in task.instructions)
Expand Down Expand Up @@ -592,6 +599,8 @@ def _run_plan(
stage_id = task.stage_id
pb = pbars[stage_id]
pb.update(1)
pb = pbars[-1]
pb.update(1)

Check warning on line 603 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L599-L603

Added lines #L599 - L603 were not covered by tests

del inflight_tasks[task_id]

Expand Down

0 comments on commit 187dabf

Please sign in to comment.