Skip to content

Commit

Permalink
Perform cleanup of tasks and results when iterator is deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 8, 2024
1 parent 3c2af5a commit c4121dc
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextlib
import logging
import threading
import uuid
from concurrent import futures
from dataclasses import dataclass
from typing import Iterator
Expand Down Expand Up @@ -34,6 +35,13 @@
logger = logging.getLogger(__name__)


# Unique UUID for each execution
ExecutionID = str

# Unique ID for each task
TaskID = str


class LocalPartitionSet(PartitionSet[MicroPartition]):
_partitions: dict[PartID, MaterializedResult[MicroPartition]]

Expand Down Expand Up @@ -223,7 +231,7 @@ def __init__(self, use_thread_pool: bool | None) -> None:
self._actor_pools: dict[str, PyActorPool] = {}

# Global accounting of tasks and resources
self._inflight_futures: dict[str, futures.Future] = {}
self._inflight_futures: dict[tuple[ExecutionID, TaskID], futures.Future] = {}

system_info = SystemInfo()
num_cpus = system_info.cpu_count()
Expand Down Expand Up @@ -263,6 +271,7 @@ def run_iter(
) -> Iterator[PyMaterializedResult]:
# NOTE: Freeze and use this same execution config for the entire execution
daft_execution_config = get_context().daft_execution_config
execution_id = str(uuid.uuid4())

# Optimize the logical plan.
builder = builder.optimize()
Expand All @@ -277,7 +286,7 @@ def run_iter(
results_buffer_size,
)
del plan_scheduler
results_gen = self._physical_plan_to_partitions(tasks)
results_gen = self._physical_plan_to_partitions(execution_id, tasks)
# if source_id is none that means this is the final stage
if source_id is None:
yield from results_gen
Expand Down Expand Up @@ -310,7 +319,7 @@ def run_iter(
tasks = plan_scheduler.to_partition_tasks(psets, results_buffer_size)
del psets
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
results_gen = self._physical_plan_to_partitions(tasks)
results_gen = self._physical_plan_to_partitions(execution_id, tasks)
yield from results_gen

def run_iter_tables(
Expand Down Expand Up @@ -346,7 +355,9 @@ def actor_pool_context(
del self._actor_pools[actor_pool_id]

def _physical_plan_to_partitions(
self, plan: physical_plan.MaterializedPhysicalPlan[MicroPartition]
self,
execution_id: str,
plan: physical_plan.MaterializedPhysicalPlan[MicroPartition],
) -> Iterator[PyMaterializedResult]:
local_futures_to_task: dict[futures.Future, PartitionTask] = {}
pbar = ProgressBar(use_ray_tqdm=False)
Expand Down Expand Up @@ -443,7 +454,7 @@ def _physical_plan_to_partitions(
assert (
next_step.id() not in local_futures_to_task
), "Step IDs should be unique - this indicates an internal error, please file an issue!"
self._inflight_futures[next_step.id()] = future
self._inflight_futures[(execution_id, next_step.id())] = future
local_futures_to_task[future] = next_step

next_step = next(plan)
Expand All @@ -463,7 +474,7 @@ def _physical_plan_to_partitions(
materialized_results = done_future.result()

pbar.mark_task_done(done_task)
del self._inflight_futures[done_task.id()]
del self._inflight_futures[(execution_id, done_task.id())]

logger.debug(
"Task completed: %s -> <%s partitions>",
Expand All @@ -476,9 +487,15 @@ def _physical_plan_to_partitions(
if next_step is None:
next_step = next(plan)

except StopIteration:
# Perform any cleanups when the generator is closed (StopIteration is raised, generator is deleted with `__del__` on GC, etc)
finally:
# Close the progress bar
pbar.close()
return

# Cleanup any remaining inflight futures/results from this local execution
for (exec_id, task_id), _ in list(self._inflight_futures.items()):
if exec_id == execution_id:
del self._inflight_futures[(exec_id, task_id)]

def _check_resource_requests(self, resource_request: ResourceRequest) -> None:
"""Validates that the requested ResourceRequest is possible to run locally"""
Expand Down

0 comments on commit c4121dc

Please sign in to comment.