Skip to content

Commit

Permalink
Accumulate unused capacity over different instance launch calls
Browse files Browse the repository at this point in the history
Accumulate unused capacity over different instance launch calls, when it isn't possible to assign the full requested allocation to a job

Signed-off-by: Luca Carrogu <[email protected]>
  • Loading branch information
lukeseawalker committed Sep 7, 2023
1 parent d0e7356 commit 7795ad3
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 13 deletions.
32 changes: 24 additions & 8 deletions src/slurm_plugin/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.

import collections
import itertools
import logging

Expand Down Expand Up @@ -349,7 +348,7 @@ def _parse_requested_nodes(self, node_list: List[str]) -> defaultdict[str, defau
Valid NodeName format: {queue_name}-{st/dy}-{compute_resource_name}-{number}
Sample NodeName: queue1-st-computeres1-2
"""
nodes_to_launch = collections.defaultdict(lambda: collections.defaultdict(list))
nodes_to_launch = defaultdict(lambda: defaultdict(list))
for node in node_list:
try:
queue_name, node_type, compute_resource_name = parse_nodename(node)
Expand Down Expand Up @@ -610,6 +609,20 @@ def _clear_unused_launched_instances(self):
"""Clear and reset unused launched instances list."""
self.unused_launched_instances = {}

def _update_dict(self, target_dict: dict, update: dict) -> dict:
logger.debug("Updating target dict (%s) with update (%s)", target_dict, update)
for update_key, update_value in update.items():
if isinstance(update_value, dict):
target_dict[update_key] = self._update_dict(target_dict.get(update_key, {}), update_value)
elif isinstance(update_value, list):
target_dict[update_key] = target_dict.get(update_key, []) + update_value
elif isinstance(update_value, set):
target_dict[update_key] = target_dict.get(update_key, set()) | update_value
else:
target_dict[update_key] = update_value
logger.debug("Updated target dict is (%s)", target_dict)
return target_dict

def add_instances(
self,
node_list: List[str],
Expand Down Expand Up @@ -783,10 +796,13 @@ def _scaling_for_jobs_multi_node(
self, job_list, node_list, launch_batch_size, assign_node_batch_size, terminate_batch_size, update_node_address
):
# Optimize job level scaling with preliminary scale-all nodes attempt
self.unused_launched_instances |= self._launch_instances(
nodes_to_launch=self._parse_requested_nodes(node_list),
launch_batch_size=launch_batch_size,
all_or_nothing_batch=True,
self._update_dict(

Check warning on line 799 in src/slurm_plugin/instance_manager.py

View check run for this annotation

Codecov / codecov/patch

src/slurm_plugin/instance_manager.py#L799

Added line #L799 was not covered by tests
self.unused_launched_instances,
self._launch_instances(
nodes_to_launch=self._parse_requested_nodes(node_list),
launch_batch_size=launch_batch_size,
all_or_nothing_batch=True,
),
)

self._scaling_for_jobs(
Expand Down Expand Up @@ -1001,7 +1017,7 @@ def _add_instances_for_job(
]
),
)
self.unused_launched_instances |= instances_launched
self._update_dict(self.unused_launched_instances, instances_launched)
self._update_failed_nodes(set(parsed_requested_node), "LimitedInstanceCapacity", override=False)
else:
# No instances launched at all, e.g. CreateFleet API returns no EC2 instances
Expand All @@ -1017,7 +1033,7 @@ def _launch_instances(
all_or_nothing_batch: bool,
job: SlurmResumeJob = None,
):
instances_launched = collections.defaultdict(lambda: collections.defaultdict(list))
instances_launched = defaultdict(lambda: defaultdict(list))
for queue, compute_resources in nodes_to_launch.items():
for compute_resource, slurm_node_list in compute_resources.items():
slurm_node_list = self._resize_slurm_node_list(
Expand Down
Loading

0 comments on commit 7795ad3

Please sign in to comment.