Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[develop] Remove separation between oversubscribe and no-oversubscribe jobs #574

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 24 additions & 54 deletions src/slurm_plugin/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def _scaling_for_jobs(
for job in job_list:
job_id_logging_filter.set_custom_value(job.job_id)

logger.debug(f"No oversubscribe Job info: {job}")
logger.debug(f"Job info: {job}")

logger.info("The nodes_resume list from Slurm Resume File is %s", print_with_count(job.nodes_resume))
self._add_instances_for_nodes(
Expand Down Expand Up @@ -648,22 +648,19 @@ def _add_instances_for_resume_file(
"""Launch requested EC2 instances for resume file."""
slurm_resume_data = self._get_slurm_resume_data(slurm_resume=slurm_resume, node_list=node_list)

# Node scaling for no oversubscribe nodes
self._clear_unused_launched_instances()

self._scaling_for_jobs_single_node(
job_list=slurm_resume_data.jobs_single_node_no_oversubscribe
+ slurm_resume_data.jobs_single_node_oversubscribe,
job_list=slurm_resume_data.jobs_single_node,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
scaling_strategy=scaling_strategy,
)

self._scaling_for_jobs_multi_node(
job_list=slurm_resume_data.jobs_multi_node_no_oversubscribe
+ slurm_resume_data.jobs_multi_node_oversubscribe,
node_list=slurm_resume_data.multi_node_no_oversubscribe + slurm_resume_data.multi_node_oversubscribe,
job_list=slurm_resume_data.jobs_multi_node,
node_list=slurm_resume_data.multi_node,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
Expand Down Expand Up @@ -708,12 +705,10 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s
Get SlurmResumeData object.

SlurmResumeData object contains the following:
* the node list for jobs with oversubscribe != NO
* the node list for jobs with oversubscribe == NO
* the job list with single node allocation with oversubscribe != NO
* the job list with multi node allocation with oversubscribe != NO
* the job list with single node allocation with oversubscribe == NO
* the job list with multi node allocation with oversubscribe == NO
* the node list for jobs allocated to single node
* the node list for jobs allocated to multiple nodes
* the job list with single node allocation
* the job list with multi node allocation

Example of Slurm Resume File (ref. https://slurm.schedmd.com/elastic_computing.html):
{
Expand Down Expand Up @@ -752,59 +747,34 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s
],
}
"""
jobs_single_node_no_oversubscribe = []
jobs_multi_node_no_oversubscribe = []
jobs_single_node_oversubscribe = []
jobs_multi_node_oversubscribe = []
single_node_no_oversubscribe = []
multi_node_no_oversubscribe = []
single_node_oversubscribe = []
multi_node_oversubscribe = []
jobs_single_node = []
jobs_multi_node = []
single_node = []
multi_node = []

slurm_resume_jobs = self._parse_slurm_resume(slurm_resume)

for job in slurm_resume_jobs:
if job.is_exclusive():
if len(job.nodes_resume) == 1:
jobs_single_node_no_oversubscribe.append(job)
single_node_no_oversubscribe.extend(job.nodes_resume)
else:
jobs_multi_node_no_oversubscribe.append(job)
multi_node_no_oversubscribe.extend(job.nodes_resume)
if len(job.nodes_resume) == 1:
jobs_single_node.append(job)
single_node.extend(job.nodes_resume)
else:
if len(job.nodes_resume) == 1:
jobs_single_node_oversubscribe.append(job)
single_node_oversubscribe.extend(job.nodes_resume)
else:
jobs_multi_node_oversubscribe.append(job)
multi_node_oversubscribe.extend(job.nodes_resume)

nodes_difference = list(
set(node_list)
- (
set(single_node_oversubscribe)
| set(multi_node_oversubscribe)
| set(single_node_no_oversubscribe)
| set(multi_node_no_oversubscribe)
)
)
jobs_multi_node.append(job)
multi_node.extend(job.nodes_resume)

nodes_difference = list(set(node_list) - (set(single_node) | set(multi_node)))

if nodes_difference:
logger.warning(
"Discarding NodeNames because of mismatch in Slurm Resume File Vs Nodes passed to Resume Program: %s",
", ".join(nodes_difference),
)
self._update_failed_nodes(set(nodes_difference), "InvalidNodenameError")
return SlurmResumeData(
# With Oversubscribe
single_node_oversubscribe=list(dict.fromkeys(single_node_oversubscribe)),
multi_node_oversubscribe=list(dict.fromkeys(multi_node_oversubscribe)),
jobs_single_node_oversubscribe=jobs_single_node_oversubscribe,
jobs_multi_node_oversubscribe=jobs_multi_node_oversubscribe,
# With No Oversubscribe
single_node_no_oversubscribe=single_node_no_oversubscribe,
multi_node_no_oversubscribe=multi_node_no_oversubscribe,
jobs_single_node_no_oversubscribe=jobs_single_node_no_oversubscribe,
jobs_multi_node_no_oversubscribe=jobs_multi_node_no_oversubscribe,
single_node=list(dict.fromkeys(single_node)),
multi_node=list(dict.fromkeys(multi_node)),
jobs_single_node=jobs_single_node,
jobs_multi_node=jobs_multi_node,
)

def _parse_slurm_resume(self, slurm_resume: Dict[str, any]) -> List[SlurmResumeJob]:
Expand Down
24 changes: 8 additions & 16 deletions src/slurm_plugin/slurm_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,14 @@ def __hash__(self):

@dataclass
class SlurmResumeData:
# List of exclusive job allocated to 1 node each
jobs_single_node_no_oversubscribe: List[SlurmResumeJob]
# List of exclusive job allocated to more than 1 node each
jobs_multi_node_no_oversubscribe: List[SlurmResumeJob]
# List of non-exclusive job allocated to 1 node each
jobs_single_node_oversubscribe: List[SlurmResumeJob]
# List of non-exclusive job allocated to more than 1 node each
jobs_multi_node_oversubscribe: List[SlurmResumeJob]
# List of node allocated to single node exclusive job
single_node_no_oversubscribe: List[str]
# List of node allocated to multiple node exclusive job
multi_node_no_oversubscribe: List[str]
# List of node allocated to single node non-exclusive job
single_node_oversubscribe: List[str]
# List of node allocated to multiple node non-exclusive job
multi_node_oversubscribe: List[str]
# List of job allocated to 1 node each
jobs_single_node: List[SlurmResumeJob]
# List of job allocated to more than 1 node each
jobs_multi_node: List[SlurmResumeJob]
# List of node allocated to single node job
single_node: List[str]
# List of node allocated to multiple node job
multi_node: List[str]


class SlurmNode(metaclass=ABCMeta):
Expand Down
Loading
Loading