Skip to content

Commit

Permalink
Add tracing of the scheduling loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 26, 2024
1 parent 6faf08f commit 98e39a8
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 184 deletions.
294 changes: 151 additions & 143 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,12 +671,12 @@ def teardown_actor_pool(self, name: str) -> None:

def _construct_dispatch_batch(
self,
next_step: PartitionTask | MaterializedResult | None,
tasks: ray_tracing.MaterializedPhysicalPlanWrapper,
dispatches_allowed: int,
place_in_queue: Callable[[MaterializedResult], None],
is_active: Callable[[], bool],
) -> tuple[PartitionTask | MaterializedResult | None, list[PartitionTask]]:
runner_tracer: RunnerTracer,
) -> tuple[list[PartitionTask], bool]:
"""Constructs a batch of PartitionTasks that should be dispatched
Args:
Expand All @@ -687,57 +687,60 @@ def _construct_dispatch_batch(
is_active (callable[[], bool]): A function that returns True if the execution should continue, False otherwise.
Returns:
tuple[PartitionTask | MaterializedResult | None, list[PartitionTask]]: A tuple containing:
- The next step in the physical plan after constructing the batch.
tuple[list[PartitionTask], bool]: A tuple containing:
- A list of PartitionTasks to be dispatched.
- A pagination boolean indicating whether or not there are more tasks to be had by calling _construct_dispatch_batch again
"""
tasks_to_dispatch: list[PartitionTask] = []
with runner_tracer.dispatch_batching():
tasks_to_dispatch: list[PartitionTask] = []

# Loop until:
# - Reached the limit of the number of tasks we are allowed to dispatch
# - Encounter a `None` as the next step (short-circuit and return has_next=False)
# - The runner becomes inactive
while len(tasks_to_dispatch) < dispatches_allowed and is_active():
next_step = next(tasks)

# Loop: Get a batch of tasks.
while len(tasks_to_dispatch) < dispatches_allowed and is_active():
if next_step is None:
# Blocked on already dispatched tasks; await some tasks.
break
# Blocked on already dispatched tasks; early terminate and mark has_next=False
if next_step is None:
return tasks_to_dispatch, False

elif isinstance(next_step, MaterializedResult):
# A final result.
place_in_queue(next_step)
next_step = next(tasks)
elif isinstance(next_step, MaterializedResult):
place_in_queue(next_step)

# No-op task: just run it locally immediately.
elif len(next_step.instructions) == 0:
logger.debug("Running task synchronously in main thread: %s", next_step)
assert (
len(next_step.partial_metadatas) == 1
), "No-op tasks must have one output by definition, since there are no instructions to run"
[single_partial] = next_step.partial_metadatas
if single_partial.num_rows is None:
[single_meta] = ray.get(get_metas.remote(next_step.inputs))
accessor = PartitionMetadataAccessor.from_metadata_list(
[single_meta.merge_with_partial(single_partial)]
)
else:
accessor = PartitionMetadataAccessor.from_metadata_list(
[
PartitionMetadata(
num_rows=single_partial.num_rows,
size_bytes=single_partial.size_bytes,
boundaries=single_partial.boundaries,
)
]
)

# next_step is a task.

# If it is a no-op task, just run it locally immediately.
elif len(next_step.instructions) == 0:
logger.debug("Running task synchronously in main thread: %s", next_step)
assert (
len(next_step.partial_metadatas) == 1
), "No-op tasks must have one output by definition, since there are no instructions to run"
[single_partial] = next_step.partial_metadatas
if single_partial.num_rows is None:
[single_meta] = ray.get(get_metas.remote(next_step.inputs))
accessor = PartitionMetadataAccessor.from_metadata_list(
[single_meta.merge_with_partial(single_partial)]
next_step.set_result(
[RayMaterializedResult(partition, accessor, 0) for partition in next_step.inputs]
)
else:
accessor = PartitionMetadataAccessor.from_metadata_list(
[
PartitionMetadata(
num_rows=single_partial.num_rows,
size_bytes=single_partial.size_bytes,
boundaries=single_partial.boundaries,
)
]
)

next_step.set_result([RayMaterializedResult(partition, accessor, 0) for partition in next_step.inputs])
next_step = next(tasks)

else:
# Add the task to the batch.
tasks_to_dispatch.append(next_step)
next_step = next(tasks)
# Actual task that needs to be dispatched
else:
tasks_to_dispatch.append(next_step)

return next_step, tasks_to_dispatch
return tasks_to_dispatch, True

def _dispatch_tasks(
self,
Expand All @@ -746,17 +749,18 @@ def _dispatch_tasks(
daft_execution_config: PyDaftExecutionConfig,
runner_tracer: RunnerTracer,
) -> Iterator[tuple[PartitionTask, list[ray.ObjectRef]]]:
for task in tasks_to_dispatch:
if task.actor_pool_id is None:
results = _build_partitions(result_uuid, daft_execution_config, task, runner_tracer)
else:
actor_pool = self._actor_pools.get(task.actor_pool_id)
assert actor_pool is not None, "Ray actor pool must live for as long as the tasks."
# TODO: Add tracing for submissions to actor pool
results = _build_partitions_on_actor_pool(task, actor_pool)
logger.debug("%s -> %s", task, results)
with runner_tracer.dispatching():
for task in tasks_to_dispatch:
if task.actor_pool_id is None:
results = _build_partitions(result_uuid, daft_execution_config, task, runner_tracer)
else:
actor_pool = self._actor_pools.get(task.actor_pool_id)
assert actor_pool is not None, "Ray actor pool must live for as long as the tasks."
# TODO: Add tracing for submissions to actor pool
results = _build_partitions_on_actor_pool(task, actor_pool)
logger.debug("%s -> %s", task, results)

yield (task, results)
yield (task, results)

def _await_tasks(
self,
Expand All @@ -765,12 +769,13 @@ def _await_tasks(
inflight_ref_to_task_id: dict[ray.ObjectRef, str],
runner_tracer: RunnerTracer,
) -> list[ray.ObjectRef]:
readies, not_readies = ray.wait(
list(inflight_ref_to_task_id.keys()),
num_returns=num_returns,
timeout=timeout,
fetch_local=False,
)
with runner_tracer.awaiting(num_returns, timeout):
readies, not_readies = ray.wait(
list(inflight_ref_to_task_id.keys()),
num_returns=num_returns,
timeout=timeout,
fetch_local=False,
)

for ready in readies:
if ready in inflight_ref_to_task_id:
Expand Down Expand Up @@ -825,92 +830,95 @@ def place_in_queue(item):
tasks = ray_tracing.MaterializedPhysicalPlanWrapper(raw_tasks, runner_tracer)

try:
next_step = next(tasks)

wave_count = 0
while is_active(): # Loop: Dispatch -> await.
while is_active(): # Loop: Dispatch (get tasks -> batch dispatch).
# Update available cluster resources based on the most current information
# TODO: improve control loop code to be more understandable and dynamically adjust backlog
cores: int = max(
next(num_cpus_provider) - self.reserved_cores, 1
) # assume at least 1 CPU core for bootstrapping clusters that scale from zero
max_inflight_tasks = cores + self.max_task_backlog
dispatches_allowed = max_inflight_tasks - len(inflight_tasks)
dispatches_allowed = min(cores, dispatches_allowed)

# Construct a batch of dispatchable tasks
next_step, tasks_to_dispatch = self._construct_dispatch_batch(
next_step,
tasks,
dispatches_allowed,
place_in_queue,
is_active,
)
wave_count += 1
with runner_tracer.dispatch_wave(wave_count):
###
# Dispatch Loop: Dispatch batching -> dispatch
###
while is_active():
# Update available cluster resources based on the most current information
# TODO: improve control loop code to be more understandable and dynamically adjust backlog
cores: int = max(
next(num_cpus_provider) - self.reserved_cores, 1
) # assume at least 1 CPU core for bootstrapping clusters that scale from zero
max_inflight_tasks = cores + self.max_task_backlog
dispatches_allowed = max_inflight_tasks - len(inflight_tasks)
dispatches_allowed = min(cores, dispatches_allowed)

# Construct a batch of dispatchable tasks
tasks_to_dispatch, has_next = self._construct_dispatch_batch(
tasks,
dispatches_allowed,
place_in_queue,
is_active,
runner_tracer,
)

if not is_active():
break
if not is_active():
break

# Dispatch the batch of tasks.
logger.debug(
"%ss: RayRunner dispatching %s tasks",
(datetime.now() - start).total_seconds(),
len(tasks_to_dispatch),
)
for task, result_obj_refs in self._dispatch_tasks(
result_uuid,
tasks_to_dispatch,
daft_execution_config,
runner_tracer,
):
inflight_tasks[task.id()] = task

for result in result_obj_refs:
inflight_ref_to_task[result] = task.id()

pbar.mark_task_start(task)

# Break the dispatch batching/dispatch loop if no more dispatches allowed, or physical plan
# needs work for forward progress
if dispatches_allowed == 0 or next_step is None:
break

# Await a batch of tasks.
# (Awaits the next task, and then the next batch of tasks within 10ms.)
completed_task_ids = []
for wait_for in ("next_one", "next_batch"):
if wait_for == "next_one":
num_returns = 1
timeout = None
elif wait_for == "next_batch":
num_returns = len(inflight_ref_to_task)
timeout = 0.01 # 10ms

if num_returns == 0:
break

readies = self._await_tasks(
num_returns,
timeout,
inflight_ref_to_task,
runner_tracer,
)
for ready in readies:
if ready in inflight_ref_to_task:
task_id = inflight_ref_to_task[ready]
completed_task_ids.append(task_id)
# Mark the entire task associated with the result as done.
task = inflight_tasks[task_id]
if isinstance(task, SingleOutputPartitionTask):
del inflight_ref_to_task[ready]
elif isinstance(task, MultiOutputPartitionTask):
for partition in task.partitions():
del inflight_ref_to_task[partition]

pbar.mark_task_done(task)
del inflight_tasks[task_id]

if next_step is None:
next_step = next(tasks)
# Dispatch the batch of tasks.
logger.debug(
"%ss: RayRunner dispatching %s tasks",
(datetime.now() - start).total_seconds(),
len(tasks_to_dispatch),
)
for task, result_obj_refs in self._dispatch_tasks(
result_uuid,
tasks_to_dispatch,
daft_execution_config,
runner_tracer,
):
inflight_tasks[task.id()] = task

for result in result_obj_refs:
inflight_ref_to_task[result] = task.id()

pbar.mark_task_start(task)

# Break the dispatch batching/dispatch loop if no more dispatches allowed, or physical plan
# needs work for forward progress
if dispatches_allowed == 0 or not has_next:
break

###
# Await: wait for some work to be completed from the current wave's dispatch
# (Awaits the next task, and then the next batch of tasks within 10ms.)
###
completed_task_ids = []
for wait_for in ("next_one", "next_batch"):
if wait_for == "next_one":
num_returns = 1
timeout = None
elif wait_for == "next_batch":
num_returns = len(inflight_ref_to_task)
timeout = 0.01 # 10ms

if num_returns == 0:
break

readies = self._await_tasks(
num_returns,
timeout,
inflight_ref_to_task,
runner_tracer,
)
for ready in readies:
if ready in inflight_ref_to_task:
task_id = inflight_ref_to_task[ready]
completed_task_ids.append(task_id)
# Mark the entire task associated with the result as done.
task = inflight_tasks[task_id]
if isinstance(task, SingleOutputPartitionTask):
del inflight_ref_to_task[ready]
elif isinstance(task, MultiOutputPartitionTask):
for partition in task.partitions():
del inflight_ref_to_task[partition]

pbar.mark_task_done(task)
del inflight_tasks[task_id]

except StopIteration as e:
place_in_queue(e)
Expand Down
Loading

0 comments on commit 98e39a8

Please sign in to comment.