Skip to content

Commit

Permalink
Kevin comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 5, 2024
1 parent 675b0b1 commit bf88e8a
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 3 deletions.
3 changes: 3 additions & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ def actor_pool_project(
actor_pool_name = f"ActorPool_stage{stage_id}"

# Keep track of materializations of the children tasks
#
# Our goal here is to saturate the actors, and so we need a sufficient number of completed child tasks to do so. However
# we do not want too many child tasks to be running (potentially starving our actors) and hence place an upper bound of `num_actors * 2`
child_materializations_buffer_len = num_actors * 2
child_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

Expand Down
1 change: 1 addition & 0 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def initialize_actor_global_state(uninitialized_projection: ExpressionsProjectio

logger.info("Initializing stateful UDFs: %s", ", ".join(partial_stateful_udfs.keys()))

# TODO: Account for Stateful Actor initialization arguments as well as user-provided batch_size
PyActorPool.initialized_stateful_udfs_process_singleton = {
name: partial_udf.func_cls() for name, partial_udf in partial_stateful_udfs.items()
}
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ def actor_pool_context(
name: Name of the actor pool for debugging/observability
resource_request: Requested amount of resources for each actor
num_actors: Number of actors to spin up
partial_stateful_udf: A stateful UDF that has been "bound" to its arguments, so each actor can run it
projection: Projection to be run on the incoming data (contains Stateful UDFs as well as other stateless expressions such as aliases)
"""
...
3 changes: 1 addition & 2 deletions tests/actor_pool/test_pyactor_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ def test_pyactor_pool():
ppm = PartialPartitionMetadata(num_rows=None, size_bytes=None)
instr = StatefulUDFProject(projection=projection)

pool_id = pool.setup()
assert pool_id == "my-pool"
pool.setup()

result = pool.submit(
instruction_stack=[instr],
Expand Down

0 comments on commit bf88e8a

Please sign in to comment.