From fc55fff5ee47d62e8e822ba8cf17d1fb4bc4aed3 Mon Sep 17 00:00:00 2001 From: Luca Carrogu Date: Tue, 5 Sep 2023 14:17:23 +0200 Subject: [PATCH 1/5] Add temporary config to enable node sharing jls Add temporary config to be able to enable node sharing jls, useful when developing the feature Signed-off-by: Luca Carrogu --- src/slurm_plugin/instance_manager.py | 19 ++++++++++++------- src/slurm_plugin/resume.py | 5 +++++ tests/slurm_plugin/test_resume.py | 1 + 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/slurm_plugin/instance_manager.py b/src/slurm_plugin/instance_manager.py index 56b8c2021..ed8289b6d 100644 --- a/src/slurm_plugin/instance_manager.py +++ b/src/slurm_plugin/instance_manager.py @@ -72,6 +72,7 @@ def get_manager( run_instances_overrides: dict = None, create_fleet_overrides: dict = None, job_level_scaling: bool = False, + temp_jls_for_node_sharing: bool = False, ): if job_level_scaling: return JobLevelScalingInstanceManager( @@ -87,6 +88,7 @@ def get_manager( fleet_config=fleet_config, run_instances_overrides=run_instances_overrides, create_fleet_overrides=create_fleet_overrides, + temp_jls_for_node_sharing=temp_jls_for_node_sharing, ) else: return NodeListScalingInstanceManager( @@ -585,6 +587,7 @@ def __init__( fleet_config: Dict[str, any] = None, run_instances_overrides: dict = None, create_fleet_overrides: dict = None, + temp_jls_for_node_sharing: bool = False, ): super().__init__( region=region, @@ -601,6 +604,7 @@ def __init__( create_fleet_overrides=create_fleet_overrides, ) self.unused_launched_instances = {} + self.temp_jls_for_node_sharing = temp_jls_for_node_sharing def _clear_unused_launched_instances(self): """Clear and reset unused launched instances list.""" @@ -762,13 +766,14 @@ def _add_instances_for_resume_file( update_node_address=update_node_address, ) - # node scaling for oversubscribe nodes - self._scaling_for_nodes( - node_list=slurm_resume_data.nodes_oversubscribe, - launch_batch_size=launch_batch_size, - update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, - ) + if not self.temp_jls_for_node_sharing: + # node scaling for oversubscribe nodes + self._scaling_for_nodes( + node_list=slurm_resume_data.nodes_oversubscribe, + launch_batch_size=launch_batch_size, + update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, + ) def _scaling_for_jobs_multi_node( self, job_list, node_list, launch_batch_size, assign_node_batch_size, terminate_batch_size, update_node_address diff --git a/src/slurm_plugin/resume.py b/src/slurm_plugin/resume.py index 56db41a69..6300bd572 100644 --- a/src/slurm_plugin/resume.py +++ b/src/slurm_plugin/resume.py @@ -47,6 +47,7 @@ class SlurmResumeConfig: "fleet_config_file": "/etc/parallelcluster/slurm_plugin/fleet-config.json", "all_or_nothing_batch": False, "job_level_scaling": True, + "temp_jls_for_node_sharing": False, } def __init__(self, config_file_path): @@ -95,6 +96,9 @@ def _get_config(self, config_file_path): self.job_level_scaling = config.getboolean( "slurm_resume", "job_level_scaling", fallback=self.DEFAULTS.get("job_level_scaling") ) + self.temp_jls_for_node_sharing = config.getboolean( + "slurm_resume", "temp_jls_for_node_sharing", fallback=self.DEFAULTS.get("temp_jls_for_node_sharing") + ) fleet_config_file = config.get( "slurm_resume", "fleet_config_file", fallback=self.DEFAULTS.get("fleet_config_file") ) @@ -197,6 +201,7 @@ def _resume(arg_nodes, resume_config, slurm_resume): run_instances_overrides=resume_config.run_instances_overrides, create_fleet_overrides=resume_config.create_fleet_overrides, job_level_scaling=resume_config.job_level_scaling, + temp_jls_for_node_sharing=resume_config.temp_jls_for_node_sharing, ) instance_manager.add_instances( slurm_resume=slurm_resume, diff --git a/tests/slurm_plugin/test_resume.py b/tests/slurm_plugin/test_resume.py index e2c707a0a..a479699e2 100644 --- a/tests/slurm_plugin/test_resume.py +++ b/tests/slurm_plugin/test_resume.py @@ -694,6 +694,7 @@ def test_resume_launch( job_level_scaling=job_level_scaling, assign_node_max_batch_size=500, terminate_max_batch_size=1000, + temp_jls_for_node_sharing=False, ) mocker.patch("slurm_plugin.resume.is_clustermgtd_heartbeat_valid", autospec=True, return_value=is_heartbeat_valid) mock_handle_failed_nodes = mocker.patch("slurm_plugin.resume._handle_failed_nodes", autospec=True) From 10058932eeda13a6bb0375a52ec64d5c04803725 Mon Sep 17 00:00:00 2001 From: Luca Carrogu Date: Tue, 5 Sep 2023 14:48:08 +0200 Subject: [PATCH 2/5] Split oversubscribed job list Split oversubscribed job list, to have ready to consume knowledge about * oversubscribed job list with single node allocation * oversubscribed job list with multiple node allocation * oversubscribed single node list * oversubscribed multi node list Signed-off-by: Luca Carrogu --- src/slurm_plugin/instance_manager.py | 38 ++++++++++++++++----- src/slurm_plugin/slurm_resources.py | 12 ++++--- tests/slurm_plugin/test_instance_manager.py | 26 +++++++++++--- 3 files changed, 58 insertions(+), 18 deletions(-) diff --git a/src/slurm_plugin/instance_manager.py b/src/slurm_plugin/instance_manager.py index ed8289b6d..c7ee575ef 100644 --- a/src/slurm_plugin/instance_manager.py +++ b/src/slurm_plugin/instance_manager.py @@ -769,7 +769,11 @@ def _add_instances_for_resume_file( if not self.temp_jls_for_node_sharing: # node scaling for oversubscribe nodes self._scaling_for_nodes( - node_list=slurm_resume_data.nodes_oversubscribe, + node_list=list( + dict.fromkeys( + slurm_resume_data.single_node_oversubscribe + slurm_resume_data.multi_node_oversubscribe + ) + ), launch_batch_size=launch_batch_size, update_node_address=update_node_address, all_or_nothing_batch=all_or_nothing_batch, @@ -800,7 +804,8 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s SlurmResumeData object contains the following: * the node list for jobs with oversubscribe != NO * the node list for jobs with oversubscribe == NO - * the job list with oversubscribe != NO + * the job list with single node allocation with oversubscribe != NO + * the job list with multi node allocation with oversubscribe != NO * the job list with single node allocation with oversubscribe == NO * the job list with multi node allocation with oversubscribe == NO @@ -843,10 +848,12 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s """ jobs_single_node_no_oversubscribe = [] jobs_multi_node_no_oversubscribe = [] - jobs_oversubscribe = [] + jobs_single_node_oversubscribe = [] + jobs_multi_node_oversubscribe = [] single_node_no_oversubscribe = [] multi_node_no_oversubscribe = [] - nodes_oversubscribe = [] + single_node_oversubscribe = [] + multi_node_oversubscribe = [] slurm_resume_jobs = self._parse_slurm_resume(slurm_resume) @@ -859,12 +866,21 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s jobs_multi_node_no_oversubscribe.append(job) multi_node_no_oversubscribe.extend(job.nodes_resume) else: - jobs_oversubscribe.append(job) - nodes_oversubscribe.extend(job.nodes_resume) + if len(job.nodes_resume) == 1: + jobs_single_node_oversubscribe.append(job) + single_node_oversubscribe.extend(job.nodes_resume) + else: + jobs_multi_node_oversubscribe.append(job) + multi_node_oversubscribe.extend(job.nodes_resume) nodes_difference = list( set(node_list) - - (set(nodes_oversubscribe) | set(single_node_no_oversubscribe) | set(multi_node_no_oversubscribe)) + - ( + set(single_node_oversubscribe) + | set(multi_node_oversubscribe) + | set(single_node_no_oversubscribe) + | set(multi_node_no_oversubscribe) + ) ) if nodes_difference: logger.warning( @@ -873,8 +889,12 @@ def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[s ) self._update_failed_nodes(set(nodes_difference), "InvalidNodenameError") return SlurmResumeData( - nodes_oversubscribe=list(dict.fromkeys(nodes_oversubscribe)), - jobs_oversubscribe=jobs_oversubscribe, + # With Oversubscribe + single_node_oversubscribe=list(dict.fromkeys(single_node_oversubscribe)), + multi_node_oversubscribe=list(dict.fromkeys(multi_node_oversubscribe)), + jobs_single_node_oversubscribe=jobs_single_node_oversubscribe, + jobs_multi_node_oversubscribe=jobs_multi_node_oversubscribe, + # With No Oversubscribe single_node_no_oversubscribe=single_node_no_oversubscribe, multi_node_no_oversubscribe=multi_node_no_oversubscribe, jobs_single_node_no_oversubscribe=jobs_single_node_no_oversubscribe, diff --git a/src/slurm_plugin/slurm_resources.py b/src/slurm_plugin/slurm_resources.py index 5ec1411c0..3924db235 100644 --- a/src/slurm_plugin/slurm_resources.py +++ b/src/slurm_plugin/slurm_resources.py @@ -154,14 +154,18 @@ class SlurmResumeData: jobs_single_node_no_oversubscribe: List[SlurmResumeJob] # List of exclusive job allocated to more than 1 node each jobs_multi_node_no_oversubscribe: List[SlurmResumeJob] - # List of non-exclusive job - jobs_oversubscribe: List[SlurmResumeJob] + # List of non-exclusive job allocated to 1 node each + jobs_single_node_oversubscribe: List[SlurmResumeJob] + # List of non-exclusive job allocated to more than 1 node each + jobs_multi_node_oversubscribe: List[SlurmResumeJob] # List of node allocated to single node exclusive job single_node_no_oversubscribe: List[str] # List of node allocated to multiple node exclusive job multi_node_no_oversubscribe: List[str] - # List of node allocated to non-exclusive job - nodes_oversubscribe: List[str] + # List of node allocated to single node non-exclusive job + single_node_oversubscribe: List[str] + # List of node allocated to multiple node non-exclusive job + multi_node_oversubscribe: List[str] class SlurmNode(metaclass=ABCMeta): diff --git a/tests/slurm_plugin/test_instance_manager.py b/tests/slurm_plugin/test_instance_manager.py index 622300eb6..be8fea8ec 100644 --- a/tests/slurm_plugin/test_instance_manager.py +++ b/tests/slurm_plugin/test_instance_manager.py @@ -1838,7 +1838,8 @@ def test_add_instances_for_resume_file( assert_that(instance_manager._scaling_for_nodes.call_count).is_equal_to(1) @pytest.mark.parametrize( - "slurm_resume, node_list, expected_nodes_oversubscribe, expected_jobs_oversubscribe, " + "slurm_resume, node_list, expected_single_node_oversubscribe, expected_multi_node_oversubscribe, " + "expected_jobs_single_node_oversubscribe, expected_jobs_multi_node_oversubscribe, " "expected_single_node_no_oversubscribe, expected_multi_node_no_oversubscribe, " "expected_jobs_single_node_no_oversubscribe, expected_jobs_multi_node_no_oversubscribe, " "expected_nodes_difference", @@ -1889,6 +1890,12 @@ def test_add_instances_for_resume_file( "nodes_resume": "queue5-st-c5xlarge-1", "oversubscribe": "NO", }, + { + "job_id": 140821, + "nodes_alloc": "queue6-st-c5xlarge-[1-2]", + "nodes_resume": "queue6-st-c5xlarge-1", + "oversubscribe": "YES", + }, ], }, [ @@ -1904,8 +1911,13 @@ def test_add_instances_for_resume_file( "broken", "queue4-st-c5xlarge-11", "queue5-st-c5xlarge-1", + "queue6-st-c5xlarge-1", ], + ["queue6-st-c5xlarge-1"], ["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2", "queue1-st-c5xlarge-3", "queue4-st-c5xlarge-11"], + [ + SlurmResumeJob(140821, "queue6-st-c5xlarge-[1-2]", "queue6-st-c5xlarge-1", "YES"), + ], [ SlurmResumeJob(140814, "queue1-st-c5xlarge-[1-4]", "queue1-st-c5xlarge-[1-3]", "YES"), SlurmResumeJob( @@ -1942,8 +1954,10 @@ def test_get_slurm_resume_data( self, slurm_resume, node_list, - expected_nodes_oversubscribe, - expected_jobs_oversubscribe, + expected_single_node_oversubscribe, + expected_multi_node_oversubscribe, + expected_jobs_single_node_oversubscribe, + expected_jobs_multi_node_oversubscribe, expected_single_node_no_oversubscribe, expected_multi_node_no_oversubscribe, expected_jobs_single_node_no_oversubscribe, @@ -1955,8 +1969,10 @@ def test_get_slurm_resume_data( ): mocker.patch("slurm_plugin.instance_manager.get_nodes_info", autospec=True) slurm_resume = instance_manager._get_slurm_resume_data(slurm_resume, node_list) - assert_that(slurm_resume.nodes_oversubscribe).contains(*expected_nodes_oversubscribe) - assert_that(slurm_resume.jobs_oversubscribe).is_equal_to(expected_jobs_oversubscribe) + assert_that(slurm_resume.single_node_oversubscribe).contains(*expected_single_node_oversubscribe) + assert_that(slurm_resume.multi_node_oversubscribe).contains(*expected_multi_node_oversubscribe) + assert_that(slurm_resume.jobs_single_node_oversubscribe).is_equal_to(expected_jobs_single_node_oversubscribe) + assert_that(slurm_resume.jobs_multi_node_oversubscribe).is_equal_to(expected_jobs_multi_node_oversubscribe) assert_that(slurm_resume.single_node_no_oversubscribe).contains(*expected_single_node_no_oversubscribe) assert_that(slurm_resume.multi_node_no_oversubscribe).contains(*expected_multi_node_no_oversubscribe) assert_that(slurm_resume.jobs_single_node_no_oversubscribe).is_equal_to( From 77a4b0230de1ead302b5ce6d5516354045ea7ba4 Mon Sep 17 00:00:00 2001 From: Luca Carrogu Date: Tue, 5 Sep 2023 15:21:26 +0200 Subject: [PATCH 3/5] Changelog for node sharing job level scaling Signed-off-by: Luca Carrogu --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6fd79ccc..040c3886e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ This file is used to list changes made in each version of the aws-parallelcluste **ENHANCEMENTS** **CHANGES** +- WIP Perform default job-level scaling for all jobs, by reading job information from `SLURM_RESUME_FILE`. **BUG FIXES** From d0e73569ca34f0bedf60bf21122797030cb5ed97 Mon Sep 17 00:00:00 2001 From: Luca Carrogu Date: Wed, 6 Sep 2023 15:07:35 +0200 Subject: [PATCH 4/5] Change retry logic on DescribeInstances API call Change retry logic on DescribeInstances to be exponential backoff plus a random number in the interval 0 + 0.5. The random number is to add a jitter so to avoid wave requests. Retries have been increased from 4 to 5 Signed-off-by: Luca Carrogu --- src/slurm_plugin/fleet_manager.py | 12 +++++++----- tests/slurm_plugin/test_fleet_manager.py | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/slurm_plugin/fleet_manager.py b/src/slurm_plugin/fleet_manager.py index 4d99db370..28baf0db8 100644 --- a/src/slurm_plugin/fleet_manager.py +++ b/src/slurm_plugin/fleet_manager.py @@ -11,6 +11,7 @@ import contextlib import copy import logging +import secrets import time from abc import ABC, abstractmethod @@ -383,15 +384,16 @@ def _get_instances_info(self, instance_ids: list): instances = [] partial_instance_ids = instance_ids - retry = 4 + retries = 5 + attempt_count = 0 # Wait for instances to be available in EC2 time.sleep(0.1) - while retry > 0 and partial_instance_ids: + while attempt_count < retries and partial_instance_ids: complete_instances, partial_instance_ids = self._retrieve_instances_info_from_ec2(partial_instance_ids) instances.extend(complete_instances) - retry = retry - 1 - if retry > 0: - time.sleep(0.3) + attempt_count += 1 + if attempt_count < retries: + time.sleep(0.3 * 2**attempt_count + (secrets.randbelow(500) / 1000)) return instances, partial_instance_ids diff --git a/tests/slurm_plugin/test_fleet_manager.py b/tests/slurm_plugin/test_fleet_manager.py index 795f43fd1..76439b9c8 100644 --- a/tests/slurm_plugin/test_fleet_manager.py +++ b/tests/slurm_plugin/test_fleet_manager.py @@ -834,7 +834,7 @@ def test_launch_instances( generate_error=False, ), ] - + 3 + + 4 * [ MockedBoto3Request( method="describe_instances", @@ -887,7 +887,7 @@ def test_launch_instances( # client error ( ["i-12345"], - 4 + 5 * [ MockedBoto3Request( method="describe_instances", From 7795ad35aee2d2e4bb9fae64ce63ca1f867a13be Mon Sep 17 00:00:00 2001 From: Luca Carrogu Date: Thu, 7 Sep 2023 12:24:30 +0200 Subject: [PATCH 5/5] Accumulate unused capacity over different instance launch calls Accumulate unused capacity over different instance launch calls, when it isn't possible to assign the full requested allocation to a job Signed-off-by: Luca Carrogu --- src/slurm_plugin/instance_manager.py | 32 ++- tests/slurm_plugin/test_instance_manager.py | 269 +++++++++++++++++++- 2 files changed, 288 insertions(+), 13 deletions(-) diff --git a/src/slurm_plugin/instance_manager.py b/src/slurm_plugin/instance_manager.py index c7ee575ef..26846b3bc 100644 --- a/src/slurm_plugin/instance_manager.py +++ b/src/slurm_plugin/instance_manager.py @@ -9,7 +9,6 @@ # OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and # limitations under the License. -import collections import itertools import logging @@ -349,7 +348,7 @@ def _parse_requested_nodes(self, node_list: List[str]) -> defaultdict[str, defau Valid NodeName format: {queue_name}-{st/dy}-{compute_resource_name}-{number} Sample NodeName: queue1-st-computeres1-2 """ - nodes_to_launch = collections.defaultdict(lambda: collections.defaultdict(list)) + nodes_to_launch = defaultdict(lambda: defaultdict(list)) for node in node_list: try: queue_name, node_type, compute_resource_name = parse_nodename(node) @@ -610,6 +609,20 @@ def _clear_unused_launched_instances(self): """Clear and reset unused launched instances list.""" self.unused_launched_instances = {} + def _update_dict(self, target_dict: dict, update: dict) -> dict: + logger.debug("Updating target dict (%s) with update (%s)", target_dict, update) + for update_key, update_value in update.items(): + if isinstance(update_value, dict): + target_dict[update_key] = self._update_dict(target_dict.get(update_key, {}), update_value) + elif isinstance(update_value, list): + target_dict[update_key] = target_dict.get(update_key, []) + update_value + elif isinstance(update_value, set): + target_dict[update_key] = target_dict.get(update_key, set()) | update_value + else: + target_dict[update_key] = update_value + logger.debug("Updated target dict is (%s)", target_dict) + return target_dict + def add_instances( self, node_list: List[str], @@ -783,10 +796,13 @@ def _scaling_for_jobs_multi_node( self, job_list, node_list, launch_batch_size, assign_node_batch_size, terminate_batch_size, update_node_address ): # Optimize job level scaling with preliminary scale-all nodes attempt - self.unused_launched_instances |= self._launch_instances( - nodes_to_launch=self._parse_requested_nodes(node_list), - launch_batch_size=launch_batch_size, - all_or_nothing_batch=True, + self._update_dict( + self.unused_launched_instances, + self._launch_instances( + nodes_to_launch=self._parse_requested_nodes(node_list), + launch_batch_size=launch_batch_size, + all_or_nothing_batch=True, + ), ) self._scaling_for_jobs( @@ -1001,7 +1017,7 @@ def _add_instances_for_job( ] ), ) - self.unused_launched_instances |= instances_launched + self._update_dict(self.unused_launched_instances, instances_launched) self._update_failed_nodes(set(parsed_requested_node), "LimitedInstanceCapacity", override=False) else: # No instances launched at all, e.g. CreateFleet API returns no EC2 instances @@ -1017,7 +1033,7 @@ def _launch_instances( all_or_nothing_batch: bool, job: SlurmResumeJob = None, ): - instances_launched = collections.defaultdict(lambda: collections.defaultdict(list)) + instances_launched = defaultdict(lambda: defaultdict(list)) for queue, compute_resources in nodes_to_launch.items(): for compute_resource, slurm_node_list in compute_resources.items(): slurm_node_list = self._resize_slurm_node_list( diff --git a/tests/slurm_plugin/test_instance_manager.py b/tests/slurm_plugin/test_instance_manager.py index be8fea8ec..075476d99 100644 --- a/tests/slurm_plugin/test_instance_manager.py +++ b/tests/slurm_plugin/test_instance_manager.py @@ -13,6 +13,7 @@ import os import re import subprocess +from collections import defaultdict from datetime import datetime, timezone from typing import Iterable from unittest.mock import call @@ -2437,7 +2438,8 @@ def test_update_slurm_node_addrs( @pytest.mark.parametrize( "job, launch_batch_size, assign_node_batch_size, update_node_address, all_or_nothing_batch, " - "expected_nodes_to_launch, mock_instances_launched, expect_assign_instances_to_nodes_called, " + "expected_nodes_to_launch, mock_instances_launched, initial_unused_launched_instances, " + "expected_unused_launched_instances, expect_assign_instances_to_nodes_called, " "expect_assign_instances_to_nodes_failure, expected_failed_nodes", [ ( @@ -2448,6 +2450,8 @@ def test_update_slurm_node_addrs( False, {}, {}, + {}, + {}, False, None, {}, @@ -2460,6 +2464,8 @@ def test_update_slurm_node_addrs( True, {"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, {}, + {}, + {}, False, None, {"InsufficientInstanceCapacity": {"queue4-st-c5xlarge-1"}}, @@ -2480,6 +2486,8 @@ def test_update_slurm_node_addrs( ] } }, + {}, + {}, True, None, {}, @@ -2500,6 +2508,8 @@ def test_update_slurm_node_addrs( ] } }, + {}, + {}, True, HostnameDnsStoreError(), {"Exception": {"queue4-st-c5xlarge-1"}}, @@ -2525,6 +2535,16 @@ def test_update_slurm_node_addrs( ] } }, + {}, + { + "queue4": { + "c5xlarge": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + } + }, False, None, {"LimitedInstanceCapacity": {"queue1-st-c5xlarge-1", "queue4-st-c5xlarge-1"}}, @@ -2542,6 +2562,8 @@ def test_update_slurm_node_addrs( True, {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, {}, + {}, + {}, False, None, {"InsufficientInstanceCapacity": {"queue1-st-c5xlarge-1", "queue4-st-c5xlarge-1"}}, @@ -2559,6 +2581,8 @@ def test_update_slurm_node_addrs( True, {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}}, {}, + {}, + {}, False, None, {"InsufficientInstanceCapacity": {"queue1-st-c5xlarge-1"}}, @@ -2584,10 +2608,108 @@ def test_update_slurm_node_addrs( ] } }, + {}, + {}, True, None, {}, ), + ( + SlurmResumeJob( + 140819, + "queue1-st-c5xlarge-1, queue4-st-c5xlarge-1", + "queue1-st-c5xlarge-1, queue4-st-c5xlarge-1", + "NO", + ), + 1, + 2, + False, + True, + {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, + { + "queue4": { + "c5xlarge": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + } + }, + { + "queue10": { + "c5xlarge": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + } + }, + { + "queue4": { + "c5xlarge": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + }, + "queue10": { + "c5xlarge": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + }, + }, + False, + None, + {"LimitedInstanceCapacity": {"queue1-st-c5xlarge-1", "queue4-st-c5xlarge-1"}}, + ), + ( + SlurmResumeJob( + 140819, + "queue1-st-c5xlarge-1, queue4-st-c5xlarge-1", + "queue1-st-c5xlarge-1, queue4-st-c5xlarge-1", + "NO", + ), + 1, + 2, + False, + True, + {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, + { + "queue4": { + "c5xlarge": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + } + }, + { + "queue4": { + "c5xlarge": [ + EC2Instance( + "i-12346", "ip.1.0.0.6", "ip-1-0-0-6", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + } + }, + { + "queue4": { + "c5xlarge": [ + EC2Instance( + "i-12346", "ip.1.0.0.6", "ip-1-0-0-6", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + } + }, + False, + None, + {"LimitedInstanceCapacity": {"queue1-st-c5xlarge-1", "queue4-st-c5xlarge-1"}}, + ), ], ) def test_add_instances_for_job( @@ -2601,15 +2723,18 @@ def test_add_instances_for_job( all_or_nothing_batch, expected_nodes_to_launch, mock_instances_launched, + initial_unused_launched_instances, + expected_unused_launched_instances, expect_assign_instances_to_nodes_called, expect_assign_instances_to_nodes_failure, expected_failed_nodes, ): - # patch internal functions + # patch internal functions and data instance_manager._launch_instances = mocker.MagicMock(return_value=mock_instances_launched) instance_manager._assign_instances_to_nodes = mocker.MagicMock( side_effect=expect_assign_instances_to_nodes_failure ) + instance_manager.unused_launched_instances = initial_unused_launched_instances instance_manager._add_instances_for_job( job, launch_batch_size, assign_node_batch_size, update_node_address, all_or_nothing_batch @@ -2626,16 +2751,15 @@ def test_add_instances_for_job( all_or_nothing_batch=all_or_nothing_batch, ) + assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances) if expect_assign_instances_to_nodes_called: instance_manager._assign_instances_to_nodes.assert_called_once() - assert_that(instance_manager.unused_launched_instances).is_empty() if expect_assign_instances_to_nodes_failure: assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) else: assert_that(instance_manager.failed_nodes).is_empty() else: instance_manager._assign_instances_to_nodes.assert_not_called() - assert_that(instance_manager.unused_launched_instances).is_equal_to(mock_instances_launched) assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) @pytest.mark.parametrize( @@ -3343,7 +3467,6 @@ def test_scaling_for_nodes( ) def test_resize_slurm_node_list( self, - mocker, instance_manager, queue, compute_resource, @@ -3367,6 +3490,142 @@ def test_resize_slurm_node_list( assert_that(new_slurm_node_list).is_equal_to(expected_slurm_node_list) assert_that(instances_launched).is_equal_to(expected_instances_launched) + @pytest.mark.parametrize( + "target_dict, update, expected_dict", + [ + ( + {}, + {}, + {}, + ), + ( + {"q1": {"c1": ["a", "b"]}}, + {}, + {"q1": {"c1": ["a", "b"]}}, + ), + ( + {"q1": {"c1": ["a", "b"]}}, + {"q1": {"c1": ["c"]}}, + {"q1": {"c1": ["a", "b", "c"]}}, + ), + ( + {}, + {"q1": {"c1": ["a", "b"]}}, + {"q1": {"c1": ["a", "b"]}}, + ), + ( + {"q1": {"c1": {"a", "b"}}}, + {}, + {"q1": {"c1": {"a", "b"}}}, + ), + ( + {"q1": {"c1": {"a", "b"}}}, + {"q1": {"c1": {"c"}}}, + {"q1": {"c1": {"a", "b", "c"}}}, + ), + ( + {}, + {"q1": {"c1": {"a", "b"}}}, + {"q1": {"c1": {"a", "b"}}}, + ), + ( + {"q1": {"c1": 1}}, + {}, + {"q1": {"c1": 1}}, + ), + ( + {"q1": {"c1": 1}}, + {"q1": {"c1": 2}}, + {"q1": {"c1": 2}}, + ), + ( + {}, + {"q1": {"c1": 3}}, + {"q1": {"c1": 3}}, + ), + ( + {"q1": {"c1": ["a", "b"], "c2": ["c"]}, "q2": {"c1": ["d"]}}, + {"q2": {"c1": ["k"]}, "q3": {"c1": ["y"]}}, + {"q1": {"c1": ["a", "b"], "c2": ["c"]}, "q2": {"c1": ["d", "k"]}, "q3": {"c1": ["y"]}}, + ), + ( + defaultdict(lambda: defaultdict(list)), + defaultdict(lambda: defaultdict(list)), + defaultdict(lambda: defaultdict(list)), + ), + ( + defaultdict(lambda: defaultdict(list)), + {"q1": {"c1": ["a", "b"]}}, + {"q1": {"c1": ["a", "b"]}}, + ), + ( + {"q1": {"c1": ["a", "b", "c"]}}, + defaultdict(lambda: defaultdict(list)), + {"q1": {"c1": ["a", "b", "c"]}}, + ), + ( + { + "q1": { + "c1": [ + EC2Instance("q1c1-1", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc)) + ], + "c2": [ + EC2Instance("q1c2-1", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc)) + ], + } + }, + {}, + { + "q1": { + "c1": [ + EC2Instance("q1c1-1", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc)) + ], + "c2": [ + EC2Instance("q1c2-1", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc)) + ], + } + }, + ), + ( + { + "q1": { + "c1": [ + EC2Instance("q1c1-1", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc)) + ], + "c2": [ + EC2Instance("q1c2-1", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc)) + ], + } + }, + { + "q1": { + "c2": [ + EC2Instance("q1c2-2", "ip.1.0.0.3", "ip-1-0-0-3", datetime(2020, 1, 1, tzinfo=timezone.utc)) + ] + } + }, + { + "q1": { + "c1": [ + EC2Instance("q1c1-1", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc)) + ], + "c2": [ + EC2Instance( + "q1c2-1", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + EC2Instance( + "q1c2-2", "ip.1.0.0.3", "ip-1-0-0-3", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ], + } + }, + ), + ], + ) + def test_update_dict(self, instance_manager, target_dict, update, expected_dict): + actual_dict = instance_manager._update_dict(target_dict, update) + assert_that(actual_dict).is_equal_to(expected_dict) + class TestNodeListScalingInstanceManager: @pytest.fixture