From d106ccc3506a4ebb274714d43af664f122e2f11e Mon Sep 17 00:00:00 2001 From: Luca Carrogu Date: Fri, 13 Oct 2023 11:05:02 +0200 Subject: [PATCH] Remove separation between oversubscribe and no-oversubscribe jobs Signed-off-by: Luca Carrogu --- src/slurm_plugin/instance_manager.py | 78 ++++------- src/slurm_plugin/slurm_resources.py | 24 ++-- tests/slurm_plugin/test_instance_manager.py | 135 +++++++------------- 3 files changed, 80 insertions(+), 157 deletions(-) diff --git a/src/slurm_plugin/instance_manager.py b/src/slurm_plugin/instance_manager.py index 65b0d3d8b..e021a613f 100644 --- a/src/slurm_plugin/instance_manager.py +++ b/src/slurm_plugin/instance_manager.py @@ -575,7 +575,7 @@ def _scaling_for_jobs( for job in job_list: job_id_logging_filter.set_custom_value(job.job_id) - logger.debug(f"No oversubscribe Job info: {job}") + logger.debug(f"Job info: {job}") logger.info("The nodes_resume list from Slurm Resume File is %s", print_with_count(job.nodes_resume)) self._add_instances_for_nodes( @@ -648,12 +648,10 @@ def _add_instances_for_resume_file( """Launch requested EC2 instances for resume file.""" slurm_resume_data = self._get_slurm_resume_data(slurm_resume=slurm_resume, node_list=node_list) - # Node scaling for no oversubscribe nodes self._clear_unused_launched_instances() self._scaling_for_jobs_single_node( - job_list=slurm_resume_data.jobs_single_node_no_oversubscribe - + slurm_resume_data.jobs_single_node_oversubscribe, + job_list=slurm_resume_data.jobs_single_node, launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, @@ -661,9 +659,8 @@ def _add_instances_for_resume_file( ) self._scaling_for_jobs_multi_node( - job_list=slurm_resume_data.jobs_multi_node_no_oversubscribe - + slurm_resume_data.jobs_multi_node_oversubscribe, - node_list=slurm_resume_data.multi_node_no_oversubscribe + slurm_resume_data.multi_node_oversubscribe, + job_list=slurm_resume_data.jobs_multi_node, + node_list=slurm_resume_data.multi_node, launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, @@ -708,12 +705,10 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s Get SlurmResumeData object. SlurmResumeData object contains the following: - * the node list for jobs with oversubscribe != NO - * the node list for jobs with oversubscribe == NO - * the job list with single node allocation with oversubscribe != NO - * the job list with multi node allocation with oversubscribe != NO - * the job list with single node allocation with oversubscribe == NO - * the job list with multi node allocation with oversubscribe == NO + * the node list for jobs allocated to single node + * the node list for jobs allocated to multiple nodes + * the job list with single node allocation + * the job list with multi node allocation Example of Slurm Resume File (ref. https://slurm.schedmd.com/elastic_computing.html): { @@ -752,42 +747,23 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s ], } """ - jobs_single_node_no_oversubscribe = [] - jobs_multi_node_no_oversubscribe = [] - jobs_single_node_oversubscribe = [] - jobs_multi_node_oversubscribe = [] - single_node_no_oversubscribe = [] - multi_node_no_oversubscribe = [] - single_node_oversubscribe = [] - multi_node_oversubscribe = [] + jobs_single_node = [] + jobs_multi_node = [] + single_node = [] + multi_node = [] slurm_resume_jobs = self._parse_slurm_resume(slurm_resume) for job in slurm_resume_jobs: - if job.is_exclusive(): - if len(job.nodes_resume) == 1: - jobs_single_node_no_oversubscribe.append(job) - single_node_no_oversubscribe.extend(job.nodes_resume) - else: - jobs_multi_node_no_oversubscribe.append(job) - multi_node_no_oversubscribe.extend(job.nodes_resume) + if len(job.nodes_resume) == 1: + jobs_single_node.append(job) + single_node.extend(job.nodes_resume) else: - if len(job.nodes_resume) == 1: - jobs_single_node_oversubscribe.append(job) - single_node_oversubscribe.extend(job.nodes_resume) - else: - jobs_multi_node_oversubscribe.append(job) - multi_node_oversubscribe.extend(job.nodes_resume) - - nodes_difference = list( - set(node_list) - - ( - set(single_node_oversubscribe) - | set(multi_node_oversubscribe) - | set(single_node_no_oversubscribe) - | set(multi_node_no_oversubscribe) - ) - ) + jobs_multi_node.append(job) + multi_node.extend(job.nodes_resume) + + nodes_difference = list(set(node_list) - (set(single_node) | set(multi_node))) + if nodes_difference: logger.warning( "Discarding NodeNames because of mismatch in Slurm Resume File Vs Nodes passed to Resume Program: %s", @@ -795,16 +771,10 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s ) self._update_failed_nodes(set(nodes_difference), "InvalidNodenameError") return SlurmResumeData( - # With Oversubscribe - single_node_oversubscribe=list(dict.fromkeys(single_node_oversubscribe)), - multi_node_oversubscribe=list(dict.fromkeys(multi_node_oversubscribe)), - jobs_single_node_oversubscribe=jobs_single_node_oversubscribe, - jobs_multi_node_oversubscribe=jobs_multi_node_oversubscribe, - # With No Oversubscribe - single_node_no_oversubscribe=single_node_no_oversubscribe, - multi_node_no_oversubscribe=multi_node_no_oversubscribe, - jobs_single_node_no_oversubscribe=jobs_single_node_no_oversubscribe, - jobs_multi_node_no_oversubscribe=jobs_multi_node_no_oversubscribe, + single_node=list(dict.fromkeys(single_node)), + multi_node=list(dict.fromkeys(multi_node)), + jobs_single_node=jobs_single_node, + jobs_multi_node=jobs_multi_node, ) def _parse_slurm_resume(self, slurm_resume: Dict[str, any]) -> List[SlurmResumeJob]: diff --git a/src/slurm_plugin/slurm_resources.py b/src/slurm_plugin/slurm_resources.py index 3924db235..abf85413f 100644 --- a/src/slurm_plugin/slurm_resources.py +++ b/src/slurm_plugin/slurm_resources.py @@ -150,22 +150,14 @@ def __hash__(self): @dataclass class SlurmResumeData: - # List of exclusive job allocated to 1 node each - jobs_single_node_no_oversubscribe: List[SlurmResumeJob] - # List of exclusive job allocated to more than 1 node each - jobs_multi_node_no_oversubscribe: List[SlurmResumeJob] - # List of non-exclusive job allocated to 1 node each - jobs_single_node_oversubscribe: List[SlurmResumeJob] - # List of non-exclusive job allocated to more than 1 node each - jobs_multi_node_oversubscribe: List[SlurmResumeJob] - # List of node allocated to single node exclusive job - single_node_no_oversubscribe: List[str] - # List of node allocated to multiple node exclusive job - multi_node_no_oversubscribe: List[str] - # List of node allocated to single node non-exclusive job - single_node_oversubscribe: List[str] - # List of node allocated to multiple node non-exclusive job - multi_node_oversubscribe: List[str] + # List of job allocated to 1 node each + jobs_single_node: List[SlurmResumeJob] + # List of job allocated to more than 1 node each + jobs_multi_node: List[SlurmResumeJob] + # List of node allocated to single node job + single_node: List[str] + # List of node allocated to multiple node job + multi_node: List[str] class SlurmNode(metaclass=ABCMeta): diff --git a/tests/slurm_plugin/test_instance_manager.py b/tests/slurm_plugin/test_instance_manager.py index 6c9daf3ce..f1d87d4e9 100644 --- a/tests/slurm_plugin/test_instance_manager.py +++ b/tests/slurm_plugin/test_instance_manager.py @@ -1486,12 +1486,9 @@ def test_add_instances( "assign_node_batch_size", "update_node_address", "scaling_strategy", - "expected_jobs_multi_node_oversubscribe", - "expected_multi_node_oversubscribe", - "expected_jobs_single_node_oversubscribe", - "expected_jobs_multi_node_no_oversubscribe", - "expected_multi_node_no_oversubscribe", - "expected_jobs_single_node_no_oversubscribe", + "expected_jobs_multi_node", + "expected_multi_node", + "expected_jobs_single_node", ), [ ( @@ -1554,21 +1551,6 @@ def test_add_instances( nodes_resume="queue1-st-c5xlarge-[1-3]", oversubscribe="YES", ), - SlurmResumeJob( - job_id=140818, - nodes_alloc="queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11", - nodes_resume="queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11", - oversubscribe="OK", - ), - ], - [ - "queue1-st-c5xlarge-1", - "queue1-st-c5xlarge-2", - "queue1-st-c5xlarge-3", - "queue4-st-c5xlarge-11", - ], - [], - [ SlurmResumeJob( job_id=140815, nodes_alloc="queue2-st-c5xlarge-[1-3]", @@ -1581,14 +1563,24 @@ def test_add_instances( nodes_resume="queue3-st-c5xlarge-[7-9]", oversubscribe="NO", ), + SlurmResumeJob( + job_id=140818, + nodes_alloc="queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11", + nodes_resume="queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11", + oversubscribe="OK", + ), ], [ + "queue1-st-c5xlarge-1", + "queue1-st-c5xlarge-2", + "queue1-st-c5xlarge-3", "queue2-st-c5xlarge-1", "queue2-st-c5xlarge-2", "queue2-st-c5xlarge-3", "queue3-st-c5xlarge-7", "queue3-st-c5xlarge-8", "queue3-st-c5xlarge-9", + "queue4-st-c5xlarge-11", ], [], ), @@ -1627,9 +1619,6 @@ def test_add_instances( "queue1-st-c5xlarge-3", ], [], - [], - [], - [], ), ( { @@ -1650,9 +1639,6 @@ def test_add_instances( ScalingStrategy.BEST_EFFORT, [], [], - [], - [], - [], [ SlurmResumeJob( job_id=140814, @@ -1696,16 +1682,6 @@ def test_add_instances( 28, True, ScalingStrategy.BEST_EFFORT, - [], - [], - [ - SlurmResumeJob( - job_id=140816, - nodes_alloc="queue3-st-c5xlarge-1", - nodes_resume="queue3-st-c5xlarge-1", - oversubscribe="YES", - ), - ], [ SlurmResumeJob( job_id=140815, @@ -1722,6 +1698,12 @@ def test_add_instances( nodes_resume="queue1-st-c5xlarge-1", oversubscribe="NO", ), + SlurmResumeJob( + job_id=140816, + nodes_alloc="queue3-st-c5xlarge-1", + nodes_resume="queue3-st-c5xlarge-1", + oversubscribe="YES", + ), ], ), ], @@ -1734,12 +1716,9 @@ def test_add_instances_for_resume_file( assign_node_batch_size, update_node_address, scaling_strategy, - expected_jobs_multi_node_oversubscribe, - expected_multi_node_oversubscribe, - expected_jobs_single_node_oversubscribe, - expected_jobs_multi_node_no_oversubscribe, - expected_multi_node_no_oversubscribe, - expected_jobs_single_node_no_oversubscribe, + expected_jobs_multi_node, + expected_multi_node, + expected_jobs_single_node, instance_manager, mocker, ): @@ -1758,15 +1737,15 @@ def test_add_instances_for_resume_file( ) instance_manager._scaling_for_jobs_single_node.assert_any_call( - job_list=expected_jobs_single_node_no_oversubscribe + expected_jobs_single_node_oversubscribe, + job_list=expected_jobs_single_node, launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, scaling_strategy=scaling_strategy, ) instance_manager._scaling_for_jobs_multi_node.assert_any_call( - job_list=expected_jobs_multi_node_no_oversubscribe + expected_jobs_multi_node_oversubscribe, - node_list=expected_multi_node_no_oversubscribe + expected_multi_node_oversubscribe, + job_list=expected_jobs_multi_node, + node_list=expected_multi_node, launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, @@ -1777,10 +1756,8 @@ def test_add_instances_for_resume_file( assert_that(instance_manager._scaling_for_jobs_multi_node.call_count).is_equal_to(1) @pytest.mark.parametrize( - "slurm_resume, node_list, expected_single_node_oversubscribe, expected_multi_node_oversubscribe, " - "expected_jobs_single_node_oversubscribe, expected_jobs_multi_node_oversubscribe, " - "expected_single_node_no_oversubscribe, expected_multi_node_no_oversubscribe, " - "expected_jobs_single_node_no_oversubscribe, expected_jobs_multi_node_no_oversubscribe, " + "slurm_resume, node_list, expected_single_node, expected_multi_node, " + "expected_jobs_single_node, expected_jobs_multi_node, " "expected_nodes_difference", [ ( @@ -1852,24 +1829,12 @@ def test_add_instances_for_resume_file( "queue5-st-c5xlarge-1", "queue6-st-c5xlarge-1", ], - ["queue6-st-c5xlarge-1"], - ["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2", "queue1-st-c5xlarge-3", "queue4-st-c5xlarge-11"], - [ - SlurmResumeJob(140821, "queue6-st-c5xlarge-[1-2]", "queue6-st-c5xlarge-1", "YES"), - ], - [ - SlurmResumeJob(140814, "queue1-st-c5xlarge-[1-4]", "queue1-st-c5xlarge-[1-3]", "YES"), - SlurmResumeJob( - 140818, - "queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11", - "queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11", - "OK", - ), - ], - [ - "queue5-st-c5xlarge-1", - ], + ["queue5-st-c5xlarge-1", "queue6-st-c5xlarge-1"], [ + "queue1-st-c5xlarge-1", + "queue1-st-c5xlarge-2", + "queue1-st-c5xlarge-3", + "queue4-st-c5xlarge-11", "queue2-st-c5xlarge-1", "queue2-st-c5xlarge-2", "queue2-st-c5xlarge-3", @@ -1880,10 +1845,18 @@ def test_add_instances_for_resume_file( [ SlurmResumeJob(140819, "queue4-st-c5xlarge-1", "queue4-st-c5xlarge-1", "NO"), SlurmResumeJob(140820, "queue5-st-c5xlarge-[1-2]", "queue5-st-c5xlarge-1", "NO"), + SlurmResumeJob(140821, "queue6-st-c5xlarge-[1-2]", "queue6-st-c5xlarge-1", "YES"), ], [ + SlurmResumeJob(140814, "queue1-st-c5xlarge-[1-4]", "queue1-st-c5xlarge-[1-3]", "YES"), SlurmResumeJob(140815, "queue2-st-c5xlarge-[1-3]", "queue2-st-c5xlarge-[1-3]", "NO"), SlurmResumeJob(140816, "queue3-st-c5xlarge-[7-10]", "queue3-st-c5xlarge-[7-9]", "NO"), + SlurmResumeJob( + 140818, + "queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11", + "queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11", + "OK", + ), ], ["broken"], ), @@ -1893,14 +1866,10 @@ def test_get_slurm_resume_data( self, slurm_resume, node_list, - expected_single_node_oversubscribe, - expected_multi_node_oversubscribe, - expected_jobs_single_node_oversubscribe, - expected_jobs_multi_node_oversubscribe, - expected_single_node_no_oversubscribe, - expected_multi_node_no_oversubscribe, - expected_jobs_single_node_no_oversubscribe, - expected_jobs_multi_node_no_oversubscribe, + expected_single_node, + expected_multi_node, + expected_jobs_single_node, + expected_jobs_multi_node, instance_manager, expected_nodes_difference, mocker, @@ -1908,18 +1877,10 @@ def test_get_slurm_resume_data( ): mocker.patch("slurm_plugin.instance_manager.get_nodes_info", autospec=True) slurm_resume = instance_manager._get_slurm_resume_data(slurm_resume, node_list) - assert_that(slurm_resume.single_node_oversubscribe).contains(*expected_single_node_oversubscribe) - assert_that(slurm_resume.multi_node_oversubscribe).contains(*expected_multi_node_oversubscribe) - assert_that(slurm_resume.jobs_single_node_oversubscribe).is_equal_to(expected_jobs_single_node_oversubscribe) - assert_that(slurm_resume.jobs_multi_node_oversubscribe).is_equal_to(expected_jobs_multi_node_oversubscribe) - assert_that(slurm_resume.single_node_no_oversubscribe).contains(*expected_single_node_no_oversubscribe) - assert_that(slurm_resume.multi_node_no_oversubscribe).contains(*expected_multi_node_no_oversubscribe) - assert_that(slurm_resume.jobs_single_node_no_oversubscribe).is_equal_to( - expected_jobs_single_node_no_oversubscribe - ) - assert_that(slurm_resume.jobs_multi_node_no_oversubscribe).is_equal_to( - expected_jobs_multi_node_no_oversubscribe - ) + assert_that(slurm_resume.single_node).contains(*expected_single_node) + assert_that(slurm_resume.multi_node).contains(*expected_multi_node) + assert_that(slurm_resume.jobs_single_node).is_equal_to(expected_jobs_single_node) + assert_that(slurm_resume.jobs_multi_node).is_equal_to(expected_jobs_multi_node) if expected_nodes_difference: assert_that(caplog.text).contains( "Discarding NodeNames because of mismatch in Slurm Resume File Vs Nodes passed to Resume Program: "