Skip to content

Commit

Permalink
Add node affinity for reducer and merger
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 29, 2024
1 parent 1e8b87d commit 37b1d8e
Showing 1 changed file with 7 additions and 28 deletions.
35 changes: 7 additions & 28 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1574,36 +1574,15 @@ def _get_reducer_inputs_location(self, reducer_idx: int) -> tuple[int, int]:
raise ValueError(f"Cannot find merger for reducer_idx: {reducer_idx}")

Check warning on line 1574 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L1573-L1574

Added lines #L1573 - L1574 were not covered by tests

def _merger_options(self, merger_idx: int) -> dict[str, Any]:
# TODO: populate the nth merger's options. Place the nth merge task on the (n % NUM_NODES)th node
#
# node_strategies = {
# node_id: {
# "scheduling_strategy": NodeAffinitySchedulingStrategy(
# node_id, soft=True
# )
# }
# for node_id in set(merge_task_placement)
# }
# self._merge_task_options = [
# node_strategies[node_id] for node_id in merge_task_placement
# ]
return {}
num_nodes = len(ray.nodes())
node_id = ray.nodes()[merger_idx % num_nodes]["NodeID"]
return {

Check warning on line 1579 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L1577-L1579

Added lines #L1577 - L1579 were not covered by tests
"scheduling_strategy": ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(node_id, soft=True)
}

def _reduce_options(self, reducer_idx: int) -> dict[str, Any]:
# TODO: populate the nth merger's options. Place the nth merge task on the (n % NUM_NODES)th node
#
# node_strategies = {
# node_id: {
# "scheduling_strategy": NodeAffinitySchedulingStrategy(
# node_id, soft=True
# )
# }
# for node_id in set(merge_task_placement)
# }
# self._merge_task_options = [
# node_strategies[node_id] for node_id in merge_task_placement
# ]
return {}
assigned_merger_idx, _ = self._get_reducer_inputs_location(reducer_idx)
return self._merger_options(assigned_merger_idx)

Check warning on line 1585 in daft/runners/ray_runner.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_runner.py#L1584-L1585

Added lines #L1584 - L1585 were not covered by tests

def run(self, materialized_inputs: list[ray.ObjectRef]) -> list[ray.ObjectRef]:
"""Runs the Mappers and Mergers in a 2-stage pipeline until all mergers are materialized
Expand Down

0 comments on commit 37b1d8e

Please sign in to comment.