Skip to content

Commit

Permalink
Add internal flag to leave running IDLE nodes
Browse files Browse the repository at this point in the history
Add internal flag to leave running IDLE nodes at the end of the resume program execution.

Signed-off-by: Luca Carrogu <[email protected]>
  • Loading branch information
lukeseawalker committed Sep 28, 2023
1 parent 523e274 commit 3b885d7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 24 deletions.
62 changes: 38 additions & 24 deletions src/slurm_plugin/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def get_manager(
run_instances_overrides: dict = None,
create_fleet_overrides: dict = None,
job_level_scaling: bool = False,
terminate_idle_nodes: bool = True,
):
if job_level_scaling:
return JobLevelScalingInstanceManager(
Expand All @@ -94,6 +95,7 @@ def get_manager(
fleet_config=fleet_config,
run_instances_overrides=run_instances_overrides,
create_fleet_overrides=create_fleet_overrides,
terminate_idle_nodes=terminate_idle_nodes,
)
else:
return NodeListScalingInstanceManager(
Expand Down Expand Up @@ -535,6 +537,7 @@ def add_instances(
slurm_resume: Dict[str, any] = None,
assign_node_batch_size: int = None,
terminate_batch_size: int = None,
terminate_idle_nodes: bool = True,
):
"""Add EC2 instances to Slurm nodes."""
# Reset failed nodes pool
Expand All @@ -551,19 +554,22 @@ def add_instances(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
terminate_idle_nodes=terminate_idle_nodes,
)
else:
logger.error(
"Not possible to perform job level scaling because Slurm resume file content is empty. "
"Falling back to node list scaling"
)
logger.info("The nodes_resume list from Slurm Resume Program is %s", print_with_count(node_list))
# Fallback to a best-effort scaling on whole node list, with no IDLE instances left
self._add_instances_for_nodes(
node_list=node_list,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=False, # TODO fallback is to perform a best-effort scaling on whole node list (with no IDLE instances)
all_or_nothing_batch=False,
terminate_idle_nodes=True,
)

self._terminate_unassigned_launched_instances(terminate_batch_size)
Expand All @@ -575,6 +581,7 @@ def _scaling_for_jobs(
assign_node_batch_size: int,
update_node_address: bool,
all_or_nothing_batch: bool,
terminate_idle_nodes: bool,
) -> None:
"""Scaling for job list."""
# Setup custom logging filter
Expand All @@ -592,6 +599,7 @@ def _scaling_for_jobs(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
terminate_idle_nodes=terminate_idle_nodes,
)

def _terminate_unassigned_launched_instances(self, terminate_batch_size):
Expand Down Expand Up @@ -621,14 +629,15 @@ def _scaling_for_jobs_single_node(
"""Scaling for job single node list."""
if job_list:
if len(job_list) == 1:
# TODO if there is only 1 job single node, think about moving the job into multinode scaling
# TODO if there is only 1 job single node, think about moving this job into the multinode scaling, so to remove the if/else here
# call _scaling_for_jobs so that JobID is logged
self._scaling_for_jobs(
job_list=job_list,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
terminate_idle_nodes=False,
)
else:
# Batch all single node jobs in a single best-effort EC2 launch request
Expand All @@ -641,6 +650,7 @@ def _scaling_for_jobs_single_node(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=False,
terminate_idle_nodes=False,
)

def _add_instances_for_resume_file(
Expand All @@ -651,6 +661,7 @@ def _add_instances_for_resume_file(
assign_node_batch_size: int,
update_node_address: bool = True,
all_or_nothing_batch: bool = False,
terminate_idle_nodes: bool = True,
):
"""Launch requested EC2 instances for resume file."""
slurm_resume_data = self._get_slurm_resume_data(slurm_resume=slurm_resume, node_list=node_list)
Expand All @@ -675,6 +686,7 @@ def _add_instances_for_resume_file(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
terminate_idle_nodes=terminate_idle_nodes,
)

def _scaling_for_jobs_multi_node(
Expand All @@ -685,6 +697,7 @@ def _scaling_for_jobs_multi_node(
assign_node_batch_size,
update_node_address,
all_or_nothing_batch: bool,
terminate_idle_nodes: bool,
):
# Optimize job level scaling with preliminary scale-all nodes attempt
self._update_dict(
Expand All @@ -702,6 +715,7 @@ def _scaling_for_jobs_multi_node(
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
terminate_idle_nodes=terminate_idle_nodes,
)

def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[str]) -> SlurmResumeData:
Expand Down Expand Up @@ -839,6 +853,7 @@ def _add_instances_for_nodes(
all_or_nothing_batch: bool = True,
node_list: List[str] = None,
job: SlurmResumeJob = None,
terminate_idle_nodes: bool = True,
):
"""Launch requested EC2 instances for nodes."""
nodes_resume_mapping = self._parse_nodes_resume_list(node_list=node_list)
Expand All @@ -855,7 +870,7 @@ def _add_instances_for_nodes(
# nodes in the resume flattened list, e.g.
# [nodes_1, nodes_2, nodes_3, nodes_4, nodes_5]

# TODO think about extracting out the allocation of unused instances from the _launch_instances call (inside _resize_slurm_node_list), so to simplify the if logic inside
# TODO think about extracting out the allocation of unused instances from the _launch_instances call (done inside _resize_slurm_node_list), so to simplify the if logic inside
instances_launched = self._launch_instances(
job=job if job else None,
nodes_to_launch=nodes_resume_mapping,
Expand All @@ -875,26 +890,25 @@ def _add_instances_for_nodes(
q_cr_instances_launched_length = len(instances_launched.get(queue, {}).get(compute_resource, []))
successful_launched_nodes += slurm_node_list[:q_cr_instances_launched_length]
failed_launch_nodes += slurm_node_list[q_cr_instances_launched_length:]
# TODO in both all-or-nothin and best-effort, the node assignment is all-or-nothing, so to not leave IDLE instances
# if all_or_nothing_batch:
self.all_or_nothing_node_assignment(
assign_node_batch_size=assign_node_batch_size,
instances_launched=instances_launched,
nodes_resume_list=nodes_resume_list,
nodes_resume_mapping=nodes_resume_mapping,
successful_launched_nodes=successful_launched_nodes,
update_node_address=update_node_address,
)
# else:
# self.best_effort_node_assignment(
# assign_node_batch_size=assign_node_batch_size,
# failed_launch_nodes=failed_launch_nodes,
# instances_launched=instances_launched,
# nodes_resume_list=nodes_resume_list,
# nodes_resume_mapping=nodes_resume_mapping,
# successful_launched_nodes=successful_launched_nodes,
# update_node_address=update_node_address,
# )
if terminate_idle_nodes:
self.all_or_nothing_node_assignment(
assign_node_batch_size=assign_node_batch_size,
instances_launched=instances_launched,
nodes_resume_list=nodes_resume_list,
nodes_resume_mapping=nodes_resume_mapping,
successful_launched_nodes=successful_launched_nodes,
update_node_address=update_node_address,
)
else:
self.best_effort_node_assignment(
assign_node_batch_size=assign_node_batch_size,
failed_launch_nodes=failed_launch_nodes,
instances_launched=instances_launched,
nodes_resume_list=nodes_resume_list,
nodes_resume_mapping=nodes_resume_mapping,
successful_launched_nodes=successful_launched_nodes,
update_node_address=update_node_address,
)

def _reset_failed_nodes(self, nodeset):
"""Remove nodeset from failed nodes dict."""
Expand Down Expand Up @@ -1009,7 +1023,7 @@ def _launch_instances(
)

# Avoid to launch instances per job for the best-effort case
if slurm_node_list and (all_or_nothing_batch or (not all_or_nothing_batch and not job)): # TODO simplify
if slurm_node_list and (all_or_nothing_batch or (not all_or_nothing_batch and not job)): # TODO simplify?
logger.info(
"Launching %s instances for nodes %s",
"all-or-nothing" if all_or_nothing_batch else "best-effort",
Expand Down
5 changes: 5 additions & 0 deletions src/slurm_plugin/resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class SlurmResumeConfig:
"fleet_config_file": "/etc/parallelcluster/slurm_plugin/fleet-config.json",
"all_or_nothing_batch": True,
"job_level_scaling": True,
"terminate_idle_nodes": True,
}

def __init__(self, config_file_path):
Expand Down Expand Up @@ -95,6 +96,9 @@ def _get_config(self, config_file_path):
self.job_level_scaling = config.getboolean(
"slurm_resume", "job_level_scaling", fallback=self.DEFAULTS.get("job_level_scaling")
)
self.terminate_idle_nodes = config.getboolean(
"slurm_resume", "terminate_idle_nodes", fallback=self.DEFAULTS.get("terminate_idle_nodes")
)
fleet_config_file = config.get(
"slurm_resume", "fleet_config_file", fallback=self.DEFAULTS.get("fleet_config_file")
)
Expand Down Expand Up @@ -214,6 +218,7 @@ def _resume(arg_nodes, resume_config, slurm_resume):
terminate_batch_size=resume_config.terminate_max_batch_size,
update_node_address=resume_config.update_node_address,
all_or_nothing_batch=resume_config.all_or_nothing_batch,
terminate_idle_nodes=resume_config.terminate_idle_nodes,
)
failed_nodes = set().union(*instance_manager.failed_nodes.values())
success_nodes = [node for node in node_list if node not in failed_nodes]
Expand Down

0 comments on commit 3b885d7

Please sign in to comment.