Skip to content

Commit

Permalink
[PERF] Remove stateful actor child materialization limit (#3099)
Browse files Browse the repository at this point in the history
Resolves #2900
  • Loading branch information
kevinzwang authored Oct 21, 2024
1 parent 9338e2e commit 4781ad3
Showing 1 changed file with 2 additions and 10 deletions.
12 changes: 2 additions & 10 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,6 @@ def actor_pool_project(
actor_pool_name = f"{stateful_udf_names}-stage={stage_id}"

# Keep track of materializations of the children tasks
#
# Our goal here is to saturate the actors, and so we need a sufficient number of completed child tasks to do so. However
# we do not want too many child tasks to be running (potentially starving our actors) and hence place an upper bound of `num_actors * 2`
child_materializations_buffer_len = num_actors * 2
child_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

# Keep track of materializations of the actor_pool tasks
Expand Down Expand Up @@ -313,8 +309,8 @@ def actor_pool_project(
if len(child_materializations) > 0 or len(actor_pool_materializations) > 0:
yield None

# If there is capacity in the pipeline, attempt to schedule child work
elif len(child_materializations) < child_materializations_buffer_len:
# Attempt to schedule child work
else:
try:
child_step = next(child_plan)
except StopIteration:
Expand All @@ -326,10 +322,6 @@ def actor_pool_project(
child_materializations.append(child_step)
yield child_step

# Otherwise, indicate that we need to wait for work to complete
else:
yield None


def monotonically_increasing_id(
child_plan: InProgressPhysicalPlan[PartitionT], column_name: str
Expand Down

0 comments on commit 4781ad3

Please sign in to comment.