diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 947c37f961..4abf8285dc 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -215,9 +215,17 @@ def actor_pool_project( # Keep track of materializations of the actor_pool tasks actor_pool_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + # Perform separate accounting for the tasks' resource request and the actors' resource request: + # * When spinning up an actor, we consider resources that are required for the persistent state in an actor (namely, GPUs and memory) + # * When running a task, we consider resources that are required for placement of tasks (namely CPUs) + task_resource_request = ResourceRequest(num_cpus=resource_request.num_cpus) + actor_resource_request = ResourceRequest( + num_gpus=resource_request.num_gpus, memory_bytes=resource_request.memory_bytes + ) + with get_context().runner().actor_pool_context( actor_pool_name, - resource_request, + actor_resource_request, num_actors, projection, ) as actor_pool_id: @@ -232,11 +240,11 @@ def actor_pool_project( PartitionTaskBuilder[PartitionT]( inputs=[next_ready_child.partition()], partial_metadatas=[next_ready_child.partition_metadata()], - resource_request=resource_request, actor_pool_id=actor_pool_id, ) .add_instruction( instruction=execution_step.StatefulUDFProject(projection), + resource_request=task_resource_request, ) .finalize_partition_task_single_output( stage_id=stage_id, diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index ce8c430252..cebf758fba 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -795,8 +795,7 @@ def _build_partitions_on_actor_pool( @ray.remote class DaftRayActor: def __init__(self, daft_execution_config: PyDaftExecutionConfig, uninitialized_projection: ExpressionsProjection): - set_execution_config(daft_execution_config) - + self.daft_execution_config = daft_execution_config partial_stateful_udfs = { name: psu for expr in uninitialized_projection @@ -814,21 +813,22 @@ def run( partial_metadatas: list[PartitionMetadata], *inputs: MicroPartition, ) -> list[list[PartitionMetadata] | MicroPartition]: - assert len(inputs) == 1, "DaftRayActor can only process single partitions" - assert len(partial_metadatas) == 1, "DaftRayActor can only process single partitions (and single metadata)" - part = inputs[0] - partial = partial_metadatas[0] - - # Bind the ExpressionsProjection to the initialized UDFs - initialized_projection = ExpressionsProjection( - [e._bind_stateful_udfs(self.initialized_stateful_udfs) for e in uninitialized_projection] - ) - new_part = part.eval_expression_list(initialized_projection) + with execution_config_ctx(config=self.daft_execution_config): + assert len(inputs) == 1, "DaftRayActor can only process single partitions" + assert len(partial_metadatas) == 1, "DaftRayActor can only process single partitions (and single metadata)" + part = inputs[0] + partial = partial_metadatas[0] + + # Bind the ExpressionsProjection to the initialized UDFs + initialized_projection = ExpressionsProjection( + [e._bind_stateful_udfs(self.initialized_stateful_udfs) for e in uninitialized_projection] + ) + new_part = part.eval_expression_list(initialized_projection) - return [ - [PartitionMetadata.from_table(new_part).merge_with_partial(partial)], - new_part, - ] + return [ + [PartitionMetadata.from_table(new_part).merge_with_partial(partial)], + new_part, + ] class RayRoundRobinActorPool: