Skip to content

Commit

Permalink
Skip job-level launch when using BEST_EFFORT scaling strategy. (#568)
Browse files Browse the repository at this point in the history
Signed-off-by: Eddy Mwiti <[email protected]>
  • Loading branch information
EddyMM authored Oct 6, 2023
1 parent 6acd753 commit 67d32db
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
15 changes: 14 additions & 1 deletion src/slurm_plugin/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ def _scaling_for_jobs(
assign_node_batch_size: int,
update_node_address: bool,
scaling_strategy: ScalingStrategy,
skip_launch: bool = False,
) -> None:
"""Scaling for job list."""
# Setup custom logging filter
Expand All @@ -592,6 +593,7 @@ def _scaling_for_jobs(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
scaling_strategy=scaling_strategy,
skip_launch=skip_launch,
)

def _terminate_unassigned_launched_instances(self, terminate_batch_size):
Expand Down Expand Up @@ -695,12 +697,18 @@ def _scaling_for_jobs_multi_node(
),
)

# Avoid a job level launch if scaling strategy is BEST_EFFORT
# The scale all-in launch has been performed already hence from this point we want to skip the extra
# job level launch of instances for jobs that are unable to get the needed capacity from the all-in scaling
skip_launch = scaling_strategy == ScalingStrategy.BEST_EFFORT

self._scaling_for_jobs(
job_list=job_list,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
scaling_strategy=scaling_strategy,
skip_launch=skip_launch,
)

def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[str]) -> SlurmResumeData:
Expand Down Expand Up @@ -838,6 +846,7 @@ def _add_instances_for_nodes(
scaling_strategy: ScalingStrategy = ScalingStrategy.ALL_OR_NOTHING,
node_list: List[str] = None,
job: SlurmResumeJob = None,
skip_launch: bool = False,
):
"""Launch requested EC2 instances for nodes."""
nodes_resume_mapping = self._parse_nodes_resume_list(node_list=node_list)
Expand All @@ -859,6 +868,7 @@ def _add_instances_for_nodes(
nodes_to_launch=nodes_resume_mapping,
launch_batch_size=launch_batch_size,
scaling_strategy=scaling_strategy,
skip_launch=skip_launch,
)
# instances launched, e.g.
# {
Expand All @@ -873,6 +883,7 @@ def _add_instances_for_nodes(
q_cr_instances_launched_length = len(instances_launched.get(queue, {}).get(compute_resource, []))
successful_launched_nodes += slurm_node_list[:q_cr_instances_launched_length]
failed_launch_nodes += slurm_node_list[q_cr_instances_launched_length:]

if scaling_strategy == ScalingStrategy.ALL_OR_NOTHING:
self.all_or_nothing_node_assignment(
assign_node_batch_size=assign_node_batch_size,
Expand Down Expand Up @@ -994,8 +1005,10 @@ def _launch_instances(
launch_batch_size: int,
scaling_strategy: ScalingStrategy,
job: SlurmResumeJob = None,
skip_launch: bool = False,
):
instances_launched = defaultdict(lambda: defaultdict(list))

for queue, compute_resources in nodes_to_launch.items():
for compute_resource, slurm_node_list in compute_resources.items():
slurm_node_list = self._resize_slurm_node_list(
Expand All @@ -1005,7 +1018,7 @@ def _launch_instances(
slurm_node_list=slurm_node_list,
)

if slurm_node_list:
if slurm_node_list and not skip_launch:
logger.info(
"Launching %s instances for nodes %s",
scaling_strategy,
Expand Down
37 changes: 32 additions & 5 deletions tests/slurm_plugin/test_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2656,7 +2656,7 @@ def test_update_slurm_node_addrs(
assert_that(instance_manager.failed_nodes).is_empty()

@pytest.mark.parametrize(
"job, launch_batch_size, assign_node_batch_size, update_node_address, scaling_strategy, "
"job, launch_batch_size, assign_node_batch_size, update_node_address, scaling_strategy, skip_launch, "
"expected_nodes_to_launch, mock_instances_launched, initial_unused_launched_instances, "
"expected_unused_launched_instances, expect_assign_instances_to_nodes_called, "
"expect_assign_instances_to_nodes_failure, expected_failed_nodes",
Expand All @@ -2667,6 +2667,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.BEST_EFFORT,
False,
{"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}},
{},
{},
Expand All @@ -2681,6 +2682,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}},
{},
{},
Expand All @@ -2695,6 +2697,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}},
{
"queue4": {
Expand All @@ -2717,6 +2720,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}},
{
"queue4": {
Expand Down Expand Up @@ -2744,6 +2748,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}},
{
"queue4": {
Expand Down Expand Up @@ -2779,6 +2784,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}},
{},
{},
Expand All @@ -2798,6 +2804,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}},
{},
{},
Expand All @@ -2817,6 +2824,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}},
{
"queue1": {
Expand Down Expand Up @@ -2844,6 +2852,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}},
{
"queue4": {
Expand Down Expand Up @@ -2894,6 +2903,7 @@ def test_update_slurm_node_addrs(
2,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
{"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}},
{
"queue4": {
Expand Down Expand Up @@ -2940,6 +2950,7 @@ def test_add_instances_for_nodes(
assign_node_batch_size,
update_node_address,
scaling_strategy,
skip_launch,
expected_nodes_to_launch,
mock_instances_launched,
initial_unused_launched_instances,
Expand All @@ -2962,13 +2973,15 @@ def test_add_instances_for_nodes(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
scaling_strategy=scaling_strategy,
skip_launch=skip_launch,
)

instance_manager._launch_instances.assert_called_once_with(
job=job,
nodes_to_launch=expected_nodes_to_launch,
launch_batch_size=launch_batch_size,
scaling_strategy=scaling_strategy,
skip_launch=skip_launch,
)

assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances)
Expand Down Expand Up @@ -3383,9 +3396,9 @@ def test_scaling_for_jobs_single_node(
)

@pytest.mark.parametrize(
"job_list, launch_batch_size, assign_node_batch_size, update_node_address, scaling_strategy",
"job_list, launch_batch_size, assign_node_batch_size, update_node_address, scaling_strategy, skip_launch",
[
([], 1, 2, True, False),
([], 1, 2, True, False, False),
(
[
SlurmResumeJob(
Expand All @@ -3399,6 +3412,7 @@ def test_scaling_for_jobs_single_node(
2,
True,
ScalingStrategy.ALL_OR_NOTHING,
False,
),
(
[
Expand All @@ -3419,6 +3433,7 @@ def test_scaling_for_jobs_single_node(
1,
False,
ScalingStrategy.ALL_OR_NOTHING,
False,
),
],
)
Expand All @@ -3431,6 +3446,7 @@ def test_scaling_for_jobs(
assign_node_batch_size,
update_node_address,
scaling_strategy,
skip_launch,
):
# patch internal functions
instance_manager._terminate_unassigned_launched_instances = mocker.MagicMock()
Expand All @@ -3445,6 +3461,7 @@ def test_scaling_for_jobs(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
scaling_strategy=scaling_strategy,
skip_launch=skip_launch,
)

if not job_list:
Expand All @@ -3458,6 +3475,7 @@ def test_scaling_for_jobs(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
scaling_strategy=scaling_strategy,
skip_launch=skip_launch,
)
setup_logging_filter.return_value.__enter__.return_value.set_custom_value.assert_any_call(job.job_id)
assert_that(
Expand Down Expand Up @@ -4103,18 +4121,20 @@ def test_all_or_nothing_node_assignment(
"scaling_strategy, "
"unused_launched_instances, "
"mock_launch_instances, "
"expected_unused_launched_instances",
"expected_unused_launched_instances, "
"expect_to_skip_job_level_launch",
[
(
[],
[],
["queue4-st-c5xlarge-1"],
1,
2,
False,
ScalingStrategy.BEST_EFFORT,
{},
{},
{},
True,
),
(
[],
Expand Down Expand Up @@ -4142,6 +4162,7 @@ def test_all_or_nothing_node_assignment(
]
}
},
True,
),
(
[],
Expand Down Expand Up @@ -4169,6 +4190,7 @@ def test_all_or_nothing_node_assignment(
]
}
},
False,
),
(
[],
Expand Down Expand Up @@ -4207,6 +4229,7 @@ def test_all_or_nothing_node_assignment(
]
}
},
False,
),
(
[
Expand Down Expand Up @@ -4256,6 +4279,7 @@ def test_all_or_nothing_node_assignment(
]
},
},
False,
),
],
)
Expand All @@ -4272,10 +4296,12 @@ def test_scaling_for_jobs_multi_node(
unused_launched_instances,
mock_launch_instances,
expected_unused_launched_instances,
expect_to_skip_job_level_launch,
):
# patch internal functions
instance_manager._launch_instances = mocker.MagicMock(return_value=mock_launch_instances)
instance_manager._scaling_for_jobs = mocker.MagicMock()

instance_manager.unused_launched_instances = unused_launched_instances

instance_manager._scaling_for_jobs_multi_node(
Expand All @@ -4293,6 +4319,7 @@ def test_scaling_for_jobs_multi_node(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
scaling_strategy=scaling_strategy,
skip_launch=expect_to_skip_job_level_launch,
)

assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances)
Expand Down

0 comments on commit 67d32db

Please sign in to comment.