Skip to content

Commit

Permalink
Perform separate accounting for resources on the task vs the actor pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 23, 2024
1 parent e940427 commit c2fe224
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
12 changes: 10 additions & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,17 @@ def actor_pool_project(
# Keep track of materializations of the actor_pool tasks
actor_pool_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

# Perform separate accounting for the tasks' resource request and the actors' resource request:
# * When spinning up an actor, we consider resources that are required for the persistent state in an actor (namely, GPUs and memory)
# * When running a task, we consider resources that are required for placement of tasks (namely CPUs)
task_resource_request = ResourceRequest(num_cpus=resource_request.num_cpus)
actor_resource_request = ResourceRequest(
num_gpus=resource_request.num_gpus, memory_bytes=resource_request.memory_bytes
)

with get_context().runner().actor_pool_context(
actor_pool_name,
resource_request,
actor_resource_request,
num_actors,
projection,
) as actor_pool_id:
Expand All @@ -232,11 +240,11 @@ def actor_pool_project(
PartitionTaskBuilder[PartitionT](
inputs=[next_ready_child.partition()],
partial_metadatas=[next_ready_child.partition_metadata()],
resource_request=resource_request,
actor_pool_id=actor_pool_id,
)
.add_instruction(
instruction=execution_step.StatefulUDFProject(projection),
resource_request=task_resource_request,
)
.finalize_partition_task_single_output(
stage_id=stage_id,
Expand Down
32 changes: 16 additions & 16 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,7 @@ def _build_partitions_on_actor_pool(
@ray.remote
class DaftRayActor:
def __init__(self, daft_execution_config: PyDaftExecutionConfig, uninitialized_projection: ExpressionsProjection):
set_execution_config(daft_execution_config)

self.daft_execution_config = daft_execution_config
partial_stateful_udfs = {
name: psu
for expr in uninitialized_projection
Expand All @@ -814,21 +813,22 @@ def run(
partial_metadatas: list[PartitionMetadata],
*inputs: MicroPartition,
) -> list[list[PartitionMetadata] | MicroPartition]:
assert len(inputs) == 1, "DaftRayActor can only process single partitions"
assert len(partial_metadatas) == 1, "DaftRayActor can only process single partitions (and single metadata)"
part = inputs[0]
partial = partial_metadatas[0]

# Bind the ExpressionsProjection to the initialized UDFs
initialized_projection = ExpressionsProjection(
[e._bind_stateful_udfs(self.initialized_stateful_udfs) for e in uninitialized_projection]
)
new_part = part.eval_expression_list(initialized_projection)
with execution_config_ctx(config=self.daft_execution_config):
assert len(inputs) == 1, "DaftRayActor can only process single partitions"
assert len(partial_metadatas) == 1, "DaftRayActor can only process single partitions (and single metadata)"
part = inputs[0]
partial = partial_metadatas[0]

# Bind the ExpressionsProjection to the initialized UDFs
initialized_projection = ExpressionsProjection(
[e._bind_stateful_udfs(self.initialized_stateful_udfs) for e in uninitialized_projection]
)
new_part = part.eval_expression_list(initialized_projection)

return [
[PartitionMetadata.from_table(new_part).merge_with_partial(partial)],
new_part,
]
return [
[PartitionMetadata.from_table(new_part).merge_with_partial(partial)],
new_part,
]


class RayRoundRobinActorPool:
Expand Down

0 comments on commit c2fe224

Please sign in to comment.